Selaa lähdekoodia

1. 使用固定币量交易
2. 先卖交易所后买链上

skyfffire 5 kuukautta sitten
vanhempi
commit
bd6c2e047c
6 muutettua tiedostoa jossa 222 lisäystä ja 113 poistoa
  1. 181 78
      arbitrage_process.py
  2. 13 16
      as.py
  3. 1 1
      mexc_client.py
  4. 3 2
      ok_chain_client.py
  5. 20 14
      price_checker_ok.py
  6. 4 2
      submit_process_demo.py

+ 181 - 78
arbitrage_process.py

@@ -40,7 +40,8 @@ def add_state_flow_entry(process_item, state_name, msg, status_val="pending"):
 
 
 class ArbitrageProcess:
 class ArbitrageProcess:
     def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier, 
     def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier, 
-                 from_token, to_token, from_token_amount_human, out_token_amount_human,
+                 from_token, to_token, 
+                 from_token_amount_human, exchange_out_amount,
                  user_exchange_wallet, user_wallet,
                  user_exchange_wallet, user_wallet,
                  symbol, process_item):
                  symbol, process_item):
         """
         """
@@ -71,33 +72,33 @@ class ArbitrageProcess:
         self.process_item = process_item
         self.process_item = process_item
 
 
         # 存储当前套利交易的细节信息,例如买入数量、价格等
         # 存储当前套利交易的细节信息,例如买入数量、价格等
-        chain_usdt_use = Decimal(from_token_amount_human)
         self.arbitrage_details = {
         self.arbitrage_details = {
-            "chain_buy_tx_hash": None,                  # 链上买入的tx hash
-            "chain_usdt_use": chain_usdt_use,           # 链上usdt减少量(使用量), todo, 暂用固定值代替
-            "out_token_amount_human": out_token_amount_human, # 链上可能会产生的out数量,交易所的代币剩余量一定要大于这个值
+            "chain_buy_tx_hash": None,                                          # 链上买入的tx hash
+            "chain_usdt_use":  Decimal(from_token_amount_human),                # 链上usdt减少量(使用量), todo, 暂用固定值代替
             "chain_amount_before_trade": 0,
             "chain_amount_before_trade": 0,
             "chain_amount_after_trade": 0,
             "chain_amount_after_trade": 0,
-            "chain_buy_amount": Decimal('0'),   # 链上币增加量(购入量), todo, 暂用即时余额代替
-            "chain_buy_price": None,                    # 链上购入价, todo
-            "chain_withdrawal_tx_hash": None,           # 链上转入交易所的tx
-            "exchange_sell_order_id": None,             # 交易所卖出id
-            "exchange_withdraw_id": None,               # 交易所提现id
-            "exchange_withdraw_amount": None,           # 交易所提现数量
+            "chain_buy_amount": Decimal('0'),                                   # 链上币增加量(购入量), todo, 暂用即时余额代替
+            "chain_buy_price": None,                                            # 链上购入价, todo
+            "chain_withdrawal_tx_hash": None,                                   # 链上转入交易所的tx
+            "exchange_out_amount": Decimal(exchange_out_amount),                # 交易所卖出量
+            "exchange_sell_order_id": None,                                     # 交易所卖出id
+            "exchange_withdraw_id": None,                                       # 交易所提现id
+            "exchange_withdraw_amount": None,                                   # 交易所提现数量
         } 
         } 
 
 
         # 定义可能的状态
         # 定义可能的状态
         self.STATES = [
         self.STATES = [
+            "SELLING_ON_EXCHANGE",          # 正在中心化交易所卖出现货
+            "WAITING_SELL_CONFIRM",         # 等待现货卖出订单确认
             "BUYING_ON_CHAIN",              # 正在链上买入
             "BUYING_ON_CHAIN",              # 正在链上买入
             "WAITING_CHAIN_CONFIRM",        # 等待链上交易确认
             "WAITING_CHAIN_CONFIRM",        # 等待链上交易确认
+            "WAITING_EXCHANGE_ROLLBACK",    # 等待交易所回滚
             # "HEDGING_ON_EXCHANGE",          # 正在中心化交易所套保
             # "HEDGING_ON_EXCHANGE",          # 正在中心化交易所套保
             # "WAITING_HEDGE_CONFIRM",        # 等待套保订单确认
             # "WAITING_HEDGE_CONFIRM",        # 等待套保订单确认
-            "TRANSFERRING_TO_EXCHANGE",     # 正在向交易所转账
-            "WAITING_TRANSFER_ARRIVE",      # 等待交易所充值到账
-            "SELLING_ON_EXCHANGE",          # 正在中心化交易所卖出现货
-            "WAITING_SELL_CONFIRM",         # 等待现货卖出订单确认
+            # "TRANSFERRING_TO_EXCHANGE",     # 正在向交易所转账
             # "CLOSING_HEDGE",                # 正在平掉套保单
             # "CLOSING_HEDGE",                # 正在平掉套保单
             # "WAITING_CLOSE_HEDGE_CONFIRM",  # 等待平掉套保单确认
             # "WAITING_CLOSE_HEDGE_CONFIRM",  # 等待平掉套保单确认
+            "WAITING_TRANSFER_ARRIVE",      # 等待交易所充值到账
             "TRANSFERRING_TO_CHAIN",        # 正在向链上转账
             "TRANSFERRING_TO_CHAIN",        # 正在向链上转账
             "WAITING_WITHDRAWAL_CONFIRM",   # 等待链上提现确认
             "WAITING_WITHDRAWAL_CONFIRM",   # 等待链上提现确认
             "COMPLETED",                    # 套利流程完成
             "COMPLETED",                    # 套利流程完成
@@ -105,12 +106,13 @@ class ArbitrageProcess:
         ]
         ]
 
 
         self.STATE_IDLE = "IDLE"
         self.STATE_IDLE = "IDLE"
+        self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE"
+        self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM"
         self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN"
         self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN"
         self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
         self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
-        self.STATE_TRANSFERRING_TO_EXCHANGE = "TRANSFERRING_TO_EXCHANGE"
+        self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK"
+        # self.STATE_TRANSFERRING_TO_EXCHANGE = "TRANSFERRING_TO_EXCHANGE"
         self.STATE_WAITING_TRANSFER_ARRIVE = "WAITING_TRANSFER_ARRIVE"
         self.STATE_WAITING_TRANSFER_ARRIVE = "WAITING_TRANSFER_ARRIVE"
-        self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE"
-        self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM"
         self.STATE_TRANSFERRING_TO_CHAIN = "TRANSFERRING_TO_CHAIN"
         self.STATE_TRANSFERRING_TO_CHAIN = "TRANSFERRING_TO_CHAIN"
         self.STATE_WAITING_WITHDRAWAL_CONFIRM = "WAITING_WITHDRAWAL_CONFIRM"
         self.STATE_WAITING_WITHDRAWAL_CONFIRM = "WAITING_WITHDRAWAL_CONFIRM"
         self.STATE_COMPLETED = "COMPLETED"
         self.STATE_COMPLETED = "COMPLETED"
@@ -134,20 +136,27 @@ class ArbitrageProcess:
         根据当前状态执行套利流程的下一步
         根据当前状态执行套利流程的下一步
         这是一个周期性调用的函数,例如在主循环中调用
         这是一个周期性调用的函数,例如在主循环中调用
         """
         """
-        if self.current_state == "BUYING_ON_CHAIN":
+
+        if self.current_state == "SELLING_ON_EXCHANGE":
+            self._execute_sell_on_exchange()
+
+        elif self.current_state == "WAITING_SELL_CONFIRM":
+            self._wait_sell_confirm()
+
+        elif self.current_state == "BUYING_ON_CHAIN":
             self._execute_buy_on_chain()
             self._execute_buy_on_chain()
 
 
         elif self.current_state == "WAITING_CHAIN_CONFIRM":
         elif self.current_state == "WAITING_CHAIN_CONFIRM":
             self._wait_chain_confirm()
             self._wait_chain_confirm()
 
 
+        elif self.current_state == "WAITING_EXCHANGE_ROLLBACK":
+            self._wait_exchange_rollback()
+
         # elif self.current_state == "TRANSFERRING_TO_EXCHANGE":
         # elif self.current_state == "TRANSFERRING_TO_EXCHANGE":
         #     self._execute_transfer_to_exchange()
         #     self._execute_transfer_to_exchange()
 
 
-        elif self.current_state == "SELLING_ON_EXCHANGE":
-            self._execute_sell_on_exchange()
-
-        elif self.current_state == "WAITING_SELL_CONFIRM":
-            self._wait_sell_confirm()
+        elif self.current_state == "WAITING_TRANSFER_ARRIVE":
+            self._wait_transfer_arrive()
 
 
         elif self.current_state == "TRANSFERRING_TO_CHAIN":
         elif self.current_state == "TRANSFERRING_TO_CHAIN":
              self._execute_transfer_to_chain()
              self._execute_transfer_to_chain()
@@ -155,9 +164,6 @@ class ArbitrageProcess:
         elif self.current_state == "WAITING_WITHDRAWAL_CONFIRM":
         elif self.current_state == "WAITING_WITHDRAWAL_CONFIRM":
              self._wait_withdrawal_confirm()
              self._wait_withdrawal_confirm()
 
 
-        elif self.current_state == "WAITING_TRANSFER_ARRIVE":
-            self._wait_transfer_arrive()
-
         elif self.current_state == "COMPLETED":
         elif self.current_state == "COMPLETED":
             msg = "套利流程成功完成!"
             msg = "套利流程成功完成!"
             logging.info(msg)
             logging.info(msg)
@@ -168,28 +174,32 @@ class ArbitrageProcess:
             logging.info(msg)
             logging.info(msg)
             add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
             add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
 
 
-    # 以下是每个状态对应的具体执行函数(伪代码)
-
-    def _execute_buy_on_chain(self):
+    # 以下是每个状态对应的具体执行函数
+    def _execute_sell_on_exchange(self):
         """
         """
-        在链上执行买入操作
+        在中心化交易所卖出现货
         """
         """
-        msg = "执行:链上买入操作..."
+        msg = "执行:中心化交易所卖出现货..."
         logging.info(msg)
         logging.info(msg)
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
         try:
+            # 第一步直接卖出,这个数量用固定数量
+            pseudo_amount_to_sell = self.arbitrage_details["exchange_out_amount"]
+            # 处理精度
+            pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
+
             # 交易所套保余额判断
             # 交易所套保余额判断
             balances = mexc.trade.get_account_info()['balances']
             balances = mexc.trade.get_account_info()['balances']
             for balance in balances:
             for balance in balances:
                 if balance['asset'] == self.coin:
                 if balance['asset'] == self.coin:
-                    if Decimal(balance['free']) < Decimal(self.arbitrage_details['out_token_amount_human']):
-                        msg = f"交易所剩余{self.coin}: {balance['free']}, 链上预计购入:{self.arbitrage_details['out_token_amount_human']}, 不能触发套保交易。"
+                    if Decimal(balance['free']) < pseudo_amount_to_sell:
+                        msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 不能触发套保交易。"
                         logging.info(msg)
                         logging.info(msg)
                         add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
                         add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
                         self._set_state(self.STATE_FAILED)
                         self._set_state(self.STATE_FAILED)
                         return
                         return
                     else:
                     else:
-                        msg = f"交易所剩余{self.coin}: {balance['free']}, 链上预计购入:{self.arbitrage_details['out_token_amount_human']}, 余额校验通过(可以套保)。"
+                        msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 余额校验通过(可以套保)。"
                         logging.info(msg)
                         logging.info(msg)
                         add_state_flow_entry(self.process_item, self.current_state, msg, "success")
                         add_state_flow_entry(self.process_item, self.current_state, msg, "success")
                         break
                         break
@@ -205,7 +215,88 @@ class ArbitrageProcess:
             msg = f"链上剩余{self.base_coin}: {from_token_balance}, 需要使用:{self.arbitrage_details["chain_usdt_use"]}, 余额充足。"
             msg = f"链上剩余{self.base_coin}: {from_token_balance}, 需要使用:{self.arbitrage_details["chain_usdt_use"]}, 余额充足。"
             logging.info(msg)
             logging.info(msg)
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")
-                
+            
+            order_params = {
+                "symbol": self.symbol.replace('_', ''),
+                "side": "SELL",
+                "type": "MARKET",
+                "quantity": int(pseudo_amount_to_sell),
+            }
+            logging.info(order_params)
+            exchange_sell_order = mexc.trade.post_order(order_params)
+            if 'orderId' not in exchange_sell_order:
+                msg = f"交易所现货卖出下单失败:{exchange_sell_order}"
+                logging.error(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                self._set_state("FAILED")
+
+                return
+
+            exchange_sell_order_id = exchange_sell_order['orderId']
+            
+            msg = f"交易所现货卖出订单已发送, 订单ID: {exchange_sell_order_id}"
+            logging.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+            self.arbitrage_details["exchange_sell_order_id"] = exchange_sell_order_id
+            self._set_state(self.STATE_WAITING_SELL_CONFIRM)
+        except Exception as e:
+            logging.error(f"交易所现货卖出下单失败:{e}")
+            self._set_state("FAILED")
+
+    def _wait_sell_confirm(self):
+        """
+        等待交易所现货卖出订单确认(完全成交)
+        """
+        exchange_sell_order_id = self.arbitrage_details["exchange_sell_order_id"]
+        msg = f"等待交易所现货卖出订单确认:{exchange_sell_order_id}"
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+        try:
+            # 查询交易所订单状态
+            waiting_times = 30
+            while True:
+                params = {
+                    "symbol": self.symbol.replace('_', ''),
+                    "orderId": exchange_sell_order_id
+                }
+                order = mexc.trade.get_order(params)
+
+                if order['status'] == "FILLED":
+                    money = Decimal(order['cummulativeQuoteQty'])
+                    amount = self.arbitrage_details["exchange_out_amount"]
+                    price = money / amount
+                    price = price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
+
+                    msg = f"交易所现货卖出订单已完全成交, 价格:{price}。{order}"
+                    self.arbitrage_details["exchange_withdraw_amount"] = order['cummulativeQuoteQty']
+
+                    logging.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                    self._set_state(self.STATE_BUYING_ON_CHAIN)
+                    return
+                else:
+                    # 继续等待成交
+                    pass
+
+                time.sleep(1)
+                waiting_times = waiting_times - 1
+
+        except Exception as e:
+            msg = f"查询交易所现货卖出订单状态时发生错误:{e}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state("FAILED")
+
+    def _execute_buy_on_chain(self):
+        """
+        在链上执行买入操作
+        """
+        msg = "执行:链上买入操作..."
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+        try:                
             self.arbitrage_details["chain_amount_before_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
             self.arbitrage_details["chain_amount_before_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
             # 调用链上客户端执行买入交易
             # 调用链上客户端执行买入交易
             chain_buy_tx_hash = web3._sign_and_send_transaction(
             chain_buy_tx_hash = web3._sign_and_send_transaction(
@@ -225,7 +316,7 @@ class ArbitrageProcess:
             msg = f"链上买入失败:{e}"
             msg = f"链上买入失败:{e}"
             logging.error(msg)
             logging.error(msg)
             add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
             add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
-            self._set_state("FAILED")
+            self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
 
 
     def _wait_chain_confirm(self):
     def _wait_chain_confirm(self):
         """
         """
@@ -256,86 +347,95 @@ class ArbitrageProcess:
                 msg = f"链上交易已确认。用{sell_amount_human}买入{buy_amount_human},价格{price_human}。"
                 msg = f"链上交易已确认。用{sell_amount_human}买入{buy_amount_human},价格{price_human}。"
                 logging.info(msg)
                 logging.info(msg)
                 add_state_flow_entry(self.process_item, self.current_state, msg, "success")
                 add_state_flow_entry(self.process_item, self.current_state, msg, "success")
-                self._set_state(self.STATE_SELLING_ON_EXCHANGE)
+                self._set_state(self.STATE_WAITING_TRANSFER_ARRIVE)
             else:
             else:
                 msg = f"链上交易确认失败:{hash}"
                 msg = f"链上交易确认失败:{hash}"
                 logging.error(msg)
                 logging.error(msg)
                 add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
                 add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
-                self._set_state("FAILED")
+                self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
 
 
         except Exception as e:
         except Exception as e:
             msg = f"查询链上确认状态时发生错误:{e}"
             msg = f"查询链上确认状态时发生错误:{e}"
             logging.error(msg)
             logging.error(msg)
             add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
             add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
-            self._set_state("FAILED")
+            self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
 
 
-    def _execute_sell_on_exchange(self):
+    def _wait_exchange_rollback(self):
         """
         """
-        在中心化交易所卖出现货
+        市价进行交易所交易回滚
         """
         """
-        msg = "执行:中心化交易所卖出现货..."
+        msg = "执行:中心化交易所买入现货回滚..."
         logging.info(msg)
         logging.info(msg)
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
         try:
-            pseudo_amount_to_sell = self.arbitrage_details["chain_buy_amount"]
-            # pseudo_amount_to_sell = Decimal('200000')
+            # 使用预提现数量进行回滚
+            pseudo_amount_to_buy = self.arbitrage_details["exchange_withdraw_amount"]
             # 处理精度
             # 处理精度
-            pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
+            pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(Decimal('1'), rounding=ROUND_DOWN)
+
+            # 交易所U余额判断
+            balances = mexc.trade.get_account_info()['balances']
+            for balance in balances:
+                if balance['asset'] == self.base_coin:
+                    pseudo_amount_to_buy = min(Decimal(balance['free']), pseudo_amount_to_buy)
+
+                    if pseudo_amount_to_buy < Decimal('10'):
+                        msg = f"交易所剩余{self.base_coin}: {balance['free']}, 小于10, 不能触发回滚交易。"
+                        logging.info(msg)
+                        add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                        self._set_state(self.STATE_FAILED)
+                        return
+                    else:
+                        msg = f"交易所剩余{self.base_coin}: {balance['free']}, 交易所准备使用:{pseudo_amount_to_buy}, 余额校验通过(可以回滚)。"
+                        logging.info(msg)
+                        add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+                        break
             
             
             order_params = {
             order_params = {
                 "symbol": self.symbol.replace('_', ''),
                 "symbol": self.symbol.replace('_', ''),
-                "side": "SELL",
+                "side": "BUY",
                 "type": "MARKET",
                 "type": "MARKET",
-                "quantity": int(pseudo_amount_to_sell),
+                "quoteOrderQty": int(pseudo_amount_to_buy),
             }
             }
             logging.info(order_params)
             logging.info(order_params)
-            exchange_sell_order = mexc.trade.post_order(order_params)
-            if 'orderId' not in exchange_sell_order:
-                msg = f"交易所现货卖出下单失败:{exchange_sell_order}"
+            exchange_buy_order = mexc.trade.post_order(order_params)
+            if 'orderId' not in exchange_buy_order:
+                msg = f"【回滚】交易所现货买入下单失败:{exchange_buy_order}"
                 logging.error(msg)
                 logging.error(msg)
                 add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
                 add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
                 self._set_state("FAILED")
                 self._set_state("FAILED")
 
 
                 return
                 return
 
 
-            exchange_sell_order_id = exchange_sell_order['orderId']
+            exchange_buy_order_id = exchange_buy_order['orderId']
             
             
-            msg = f"交易所现货卖出订单已发送, 订单ID: {exchange_sell_order_id}"
+            msg = f"【回滚】交易所现货买入订单已发送, 订单ID: {exchange_buy_order_id}"
             logging.info(msg)
             logging.info(msg)
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
 
-            self.arbitrage_details["exchange_sell_order_id"] = exchange_sell_order_id
-            self._set_state(self.STATE_WAITING_SELL_CONFIRM)
-        except Exception as e:
-            logging.error(f"交易所现货卖出下单失败:{e}")
-            self._set_state("FAILED")
-
-    def _wait_sell_confirm(self):
-        """
-        等待交易所现货卖出订单确认(完全成交)
-        """
-        exchange_sell_order_id = self.arbitrage_details["exchange_sell_order_id"]
-        msg = f"等待交易所现货卖出订单确认:{exchange_sell_order_id}"
-        logging.info(msg)
-        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
-        try:
             # 查询交易所订单状态
             # 查询交易所订单状态
             waiting_times = 30
             waiting_times = 30
+            last_query_rst = None
             while True:
             while True:
                 params = {
                 params = {
                     "symbol": self.symbol.replace('_', ''),
                     "symbol": self.symbol.replace('_', ''),
-                    "orderId": exchange_sell_order_id
+                    "orderId": exchange_buy_order_id
                 }
                 }
                 order = mexc.trade.get_order(params)
                 order = mexc.trade.get_order(params)
+                last_query_rst = order
 
 
                 if order['status'] == "FILLED":
                 if order['status'] == "FILLED":
-                    msg = f"交易所现货卖出订单已完全成交。{order}"
-                    self.arbitrage_details["exchange_withdraw_amount"] = order['cummulativeQuoteQty']
+                    money = Decimal(order['cummulativeQuoteQty'])
+                    amount = self.arbitrage_details["exchange_out_amount"]
+                    price = money / amount
+                    price = price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
+
+                    msg = f"【回滚】交易所现货买入订单已完全成交, 价格:{price}。{order}"
 
 
                     logging.info(msg)
                     logging.info(msg)
                     add_state_flow_entry(self.process_item, self.current_state, msg, "success")
                     add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
 
-                    self._set_state(self.STATE_WAITING_TRANSFER_ARRIVE)
+                    self._set_state(self.STATE_FAILED)
                     return
                     return
                 else:
                 else:
                     # 继续等待成交
                     # 继续等待成交
@@ -343,11 +443,14 @@ class ArbitrageProcess:
 
 
                 time.sleep(1)
                 time.sleep(1)
                 waiting_times = waiting_times - 1
                 waiting_times = waiting_times - 1
+            
+            msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_buy_order_id},最终状态:{last_query_rst}"
+            logging.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
 
 
+            self._set_state(self.STATE_FAILED)
         except Exception as e:
         except Exception as e:
-            msg = f"查询交易所现货卖出订单状态时发生错误:{e}"
-            logging.error(msg)
-            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            logging.error(f"【回滚】交易所回滚交易失败:{e}")
             self._set_state("FAILED")
             self._set_state("FAILED")
 
 
     def _wait_transfer_arrive(self):
     def _wait_transfer_arrive(self):
@@ -555,7 +658,7 @@ if __name__ == "__main__":
                                         SYMBOL, process_item)
                                         SYMBOL, process_item)
 
 
     # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
     # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
-    ap._set_state(ap.STATE_BUYING_ON_CHAIN)
+    ap._set_state(ap.SELLING_ON_EXCHANGE)
 
 
     # 在主循环中周期性调用 run_arbitrage_step
     # 在主循环中周期性调用 run_arbitrage_step
     while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
     while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
@@ -563,8 +666,8 @@ if __name__ == "__main__":
 
 
         if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
         if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
             time.sleep(10)
             time.sleep(10)
-        else:
-            time.sleep(1)
+        # else:
+        #     time.sleep(1)
 
 
     logging.info(process_item)
     logging.info(process_item)
     if ap.current_state == "COMPLETED":
     if ap.current_state == "COMPLETED":

+ 13 - 16
as.py

@@ -106,24 +106,25 @@ def arbitrage_process_flow(process_item):
     USER_EXCHANGE_WALLET = process_item['userExchangeWallet']
     USER_EXCHANGE_WALLET = process_item['userExchangeWallet']
     USER_WALLET = process_item['userWallet']
     USER_WALLET = process_item['userWallet']
     SYMBOL = process_item['symbol']
     SYMBOL = process_item['symbol']
+    EXCHANGE_OUT_AMOUNT = process_item['exchangeOutAmount']
 
 
 
 
     gas_price_multiplier = 1
     gas_price_multiplier = 1
-    if profit > 2:
-        gas_price_multiplier = 1.1
-    elif profit > 5:
-        gas_price_multiplier = 1.5
-    elif profit > 10:
-        gas_price_multiplier = 2
+    # if profit > 2:
+    #     gas_price_multiplier = 1.1
+    # elif profit > 5:
+    #     gas_price_multiplier = 1.5
+    # elif profit > 10:
+    # gas_price_multiplier = 2
 
 
     ap = arbitrage_process.ArbitrageProcess(tx, 2, gas_price_multiplier, 
     ap = arbitrage_process.ArbitrageProcess(tx, 2, gas_price_multiplier, 
                                                            FROM_TOKEN, TO_TOKEN, 
                                                            FROM_TOKEN, TO_TOKEN, 
-                                                           FROM_TOKEN_AMOUNT_HUMAM, TO_TOKEN_AMOUNT_HUMAM,
+                                                           FROM_TOKEN_AMOUNT_HUMAM, EXCHANGE_OUT_AMOUNT,
                                                            USER_EXCHANGE_WALLET, USER_WALLET,
                                                            USER_EXCHANGE_WALLET, USER_WALLET,
                                                            SYMBOL, process_item)
                                                            SYMBOL, process_item)
 
 
     # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
     # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
-    ap._set_state(ap.STATE_BUYING_ON_CHAIN)
+    ap._set_state(ap.STATE_SELLING_ON_EXCHANGE)
 
 
     # 在主循环中周期性调用 run_arbitrage_step
     # 在主循环中周期性调用 run_arbitrage_step
     while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
     while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
@@ -131,14 +132,8 @@ def arbitrage_process_flow(process_item):
 
 
         if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
         if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
             time.sleep(10)
             time.sleep(10)
-        else:
-            time.sleep(1)
 
 
-    logging.info(process_item)
-    if ap.current_state == "COMPLETED":
-        logging.info("套利流程执行成功!")
-    else:
-        logging.info("套利流程执行失败!")
+    ap.run_arbitrage_step()
 
 
     move_completed_process_to_history(process_id)
     move_completed_process_to_history(process_id)
     
     
@@ -149,7 +144,7 @@ def handle_submit_process():
     if not data:
     if not data:
         return jsonify({"error": "无效的 JSON 请求体"}), 400
         return jsonify({"error": "无效的 JSON 请求体"}), 400
 
 
-    required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'fromToken', 'fromTokenAmountHuman', 'fromTokenDecimal', 'toToken', 'toTokenAmountHuman']
+    required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'fromToken', 'fromTokenAmountHuman', 'fromTokenDecimal', 'toToken', 'toTokenAmountHuman', 'exchangeOutAmount']
     for field in required_fields:
     for field in required_fields:
         if field not in data:
         if field not in data:
             return jsonify({"error": f"缺少字段: {field}"}), 400
             return jsonify({"error": f"缺少字段: {field}"}), 400
@@ -160,6 +155,7 @@ def handle_submit_process():
         from_token_amount_human = decimal.Decimal(str(data['fromTokenAmountHuman']))    # fromToken 的人类可读数量
         from_token_amount_human = decimal.Decimal(str(data['fromTokenAmountHuman']))    # fromToken 的人类可读数量
         from_token_decimal = decimal.Decimal(str(data['fromTokenDecimal']))             # fromToken 的小数位数
         from_token_decimal = decimal.Decimal(str(data['fromTokenDecimal']))             # fromToken 的小数位数
         to_token_amount_human = decimal.Decimal(str(data['toTokenAmountHuman']))        # toToken 的人类可读数量
         to_token_amount_human = decimal.Decimal(str(data['toTokenAmountHuman']))        # toToken 的人类可读数量
+        exchange_out_amount = decimal.Decimal(str(data['exchangeOutAmount']))           # 交易所需要卖出的数量
     except (decimal.InvalidOperation, ValueError) as e:
     except (decimal.InvalidOperation, ValueError) as e:
         return jsonify({"error": f"请求体中包含无效的小数/整数值: {e}"}), 400
         return jsonify({"error": f"请求体中包含无效的小数/整数值: {e}"}), 400
 
 
@@ -187,6 +183,7 @@ def handle_submit_process():
             "fromTokenAmountHuman": str(from_token_amount_human), # 起始代币数量 (人类可读, 字符串存储)
             "fromTokenAmountHuman": str(from_token_amount_human), # 起始代币数量 (人类可读, 字符串存储)
             "fromTokenDecimal": from_token_decimal, # 起始代币小数位数
             "fromTokenDecimal": from_token_decimal, # 起始代币小数位数
             "toTokenAmountHuman": str(to_token_amount_human),
             "toTokenAmountHuman": str(to_token_amount_human),
+            "exchangeOutAmount": str(exchange_out_amount),
             "toToken": data['toToken'], # 目标代币
             "toToken": data['toToken'], # 目标代币
             "stateFlow": [], # 状态流转记录
             "stateFlow": [], # 状态流转记录
             "currentState": "PENDING_START", # 当前状态
             "currentState": "PENDING_START", # 当前状态

+ 1 - 1
mexc_client.py

@@ -565,7 +565,7 @@ if __name__ == '__main__':
             #     'coin': 'USDT',
             #     'coin': 'USDT',
             #     'netWork': 'ETH',
             #     'netWork': 'ETH',
             #     'address': '0xb1f33026db86a86372493a3b124d7123e9045bb4',
             #     'address': '0xb1f33026db86a86372493a3b124d7123e9045bb4',
-            #     'amount': 392
+            #     'amount': 57
             # }
             # }
             # withdraw_rst = client.wallet.post_withdraw(withdraw_params)
             # withdraw_rst = client.wallet.post_withdraw(withdraw_params)
             # print(f"  提笔响应:{withdraw_rst}")
             # print(f"  提笔响应:{withdraw_rst}")

+ 3 - 2
ok_chain_client.py

@@ -118,7 +118,7 @@ def send_post_request(request_path, body_params_dict=None):
         print(f"Failed to decode JSON. Response content: {response.text}")
         print(f"Failed to decode JSON. Response content: {response.text}")
     return None
     return None
 
 
-def swap(chain_id, amount, from_token_address, to_token_address, slippage, user_wallet_address, receiver_address=None):
+def swap(chain_id, amount, from_token_address, to_token_address, slippage, user_wallet_address, receiver_address=None, gas_level='average'):
     get_request_path = '/api/v5/dex/aggregator/swap'
     get_request_path = '/api/v5/dex/aggregator/swap'
     get_params = {
     get_params = {
         'chainIndex': chain_id,
         'chainIndex': chain_id,
@@ -126,7 +126,8 @@ def swap(chain_id, amount, from_token_address, to_token_address, slippage, user_
         'fromTokenAddress': from_token_address,
         'fromTokenAddress': from_token_address,
         'toTokenAddress': to_token_address,
         'toTokenAddress': to_token_address,
         'slippage': slippage,
         'slippage': slippage,
-        'userWalletAddress': user_wallet_address
+        'userWalletAddress': user_wallet_address,
+        'gasLevel': gas_level
     }
     }
 
 
     if receiver_address is not None:
     if receiver_address is not None:

+ 20 - 14
price_checker_ok.py

@@ -20,6 +20,7 @@ ARB_EXECUTOR_URL = "http://localhost:5002/submit_process"
 
 
 # --- 配置部分 ---
 # --- 配置部分 ---
 IN_AMOUNT_TO_QUERY = decimal.Decimal('350')
 IN_AMOUNT_TO_QUERY = decimal.Decimal('350')
+EXCHANGE_OUT_AMOUNT = decimal.Decimal('10000000')
 PROFIT_LIMIT = 0.015                 # 1.5%利润才触发交易,稳妥模式
 PROFIT_LIMIT = 0.015                 # 1.5%利润才触发交易,稳妥模式
 IN_TOKEN_ADDRESS = '0xdAC17F958D2ee523a2206206994597C13D831ec7' # USDT on Ethereum
 IN_TOKEN_ADDRESS = '0xdAC17F958D2ee523a2206206994597C13D831ec7' # USDT on Ethereum
 IN_TOKEN_DECIMALS = 6
 IN_TOKEN_DECIMALS = 6
@@ -40,7 +41,7 @@ mode = None
 def get_chain_price_vs_target_currency(chain_id, in_token_addr, out_token_addr, amount, in_token_decimals, slippage, user_wallet_addr, user_exchange_wallet_addr):
 def get_chain_price_vs_target_currency(chain_id, in_token_addr, out_token_addr, amount, in_token_decimals, slippage, user_wallet_addr, user_exchange_wallet_addr):
     try:
     try:
         in_token_amount = amount * (10 ** in_token_decimals)
         in_token_amount = amount * (10 ** in_token_decimals)
-        data = ok_chain_client.swap(chain_id, in_token_amount, in_token_addr, out_token_addr, slippage, user_wallet_addr, user_exchange_wallet_addr)
+        data = ok_chain_client.swap(chain_id, in_token_amount, in_token_addr, out_token_addr, slippage, user_wallet_addr, user_exchange_wallet_addr, 'fast')
 
 
         if data.get('code') == '0' and data.get('data'):
         if data.get('code') == '0' and data.get('data'):
             d = data['data'][0]
             d = data['data'][0]
@@ -127,7 +128,8 @@ def send_arb_msg(profit, data):
         "fromTokenAmountHuman": str(human_in_base),
         "fromTokenAmountHuman": str(human_in_base),
         "fromTokenDecimal": IN_TOKEN_DECIMALS,                  
         "fromTokenDecimal": IN_TOKEN_DECIMALS,                  
         "toToken": OUT_TOKEN_ADDRESS,
         "toToken": OUT_TOKEN_ADDRESS,
-        "toTokenAmountHuman": str(human_out_target)
+        "toTokenAmountHuman": str(human_out_target),
+        "exchangeOutAmount": str(EXCHANGE_OUT_AMOUNT)
     }
     }
 
 
     print(f"正在提交套利数据到 {ARB_EXECUTOR_URL}")
     print(f"正在提交套利数据到 {ARB_EXECUTOR_URL}")
@@ -169,7 +171,21 @@ def update_data_for_plotly_and_table():
         fetch_time_full = time.strftime("%Y-%m-%d %H:%M:%S")
         fetch_time_full = time.strftime("%Y-%m-%d %H:%M:%S")
         fetch_time_chart = time.strftime("%H:%M:%S")
         fetch_time_chart = time.strftime("%H:%M:%S")
 
 
-        # 1. Okx: price_base_per_target (e.g., USDT / RATO)
+        # 1. MEXC: price_target_per_usdt_bid1 (e.g., RATO / USDT)
+        mexc_data = get_mexc_spot_price_target_usdt_bid(MEXC_TARGET_PAIR_USDT)
+        mexc_price_target_per_usdt_bid1 = mexc_data.get("price_target_per_usdt_bid1") # TARGET/USDT
+        mexc_err = mexc_data.get("error")
+
+        # Convert MEXC price to USDT / TARGET for comparison
+        mexc_price_usdt_per_target_bid1 = None
+        if mexc_price_target_per_usdt_bid1 is not None and mexc_price_target_per_usdt_bid1 > 0:
+            mexc_price_usdt_per_target_bid1 = mexc_price_target_per_usdt_bid1 # TARGET / USDT
+        elif not mexc_err and mexc_price_target_per_usdt_bid1 is not None : # Price is 0 or less, but not API error
+             mexc_err = mexc_err or "MEXC价格为0或无效"
+
+        # 2. Okx: price_base_per_target (e.g., USDT / RATO)
+        IN_AMOUNT_TO_QUERY = mexc_price_target_per_usdt_bid1 * EXCHANGE_OUT_AMOUNT
+        IN_AMOUNT_TO_QUERY = IN_AMOUNT_TO_QUERY.quantize(decimal.Decimal('1'), rounding=decimal.ROUND_DOWN)
         oo_data, data = get_chain_price_vs_target_currency(
         oo_data, data = get_chain_price_vs_target_currency(
             CHAIN_ID,
             CHAIN_ID,
             IN_TOKEN_ADDRESS,
             IN_TOKEN_ADDRESS,
@@ -183,17 +199,7 @@ def update_data_for_plotly_and_table():
         oo_price_usdt_per_target = oo_data.get("price_base_per_target") # USDT/TARGET
         oo_price_usdt_per_target = oo_data.get("price_base_per_target") # USDT/TARGET
         oo_err = oo_data.get("error")
         oo_err = oo_data.get("error")
 
 
-        # 2. MEXC: price_target_per_usdt_bid1 (e.g., RATO / USDT)
-        mexc_data = get_mexc_spot_price_target_usdt_bid(MEXC_TARGET_PAIR_USDT)
-        mexc_price_target_per_usdt_bid1 = mexc_data.get("price_target_per_usdt_bid1") # TARGET/USDT
-        mexc_err = mexc_data.get("error")
-
-        # Convert MEXC price to USDT / TARGET for comparison
-        mexc_price_usdt_per_target_bid1 = None
-        if mexc_price_target_per_usdt_bid1 is not None and mexc_price_target_per_usdt_bid1 > 0:
-            mexc_price_usdt_per_target_bid1 = mexc_price_target_per_usdt_bid1 # TARGET / USDT
-        elif not mexc_err and mexc_price_target_per_usdt_bid1 is not None : # Price is 0 or less, but not API error
-             mexc_err = mexc_err or "MEXC价格为0或无效"
+        logging.info(f"交易所卖出{EXCHANGE_OUT_AMOUNT}, 链上大约需要花{IN_AMOUNT_TO_QUERY}去对冲。")
 
 
         # 3. Calculate Difference
         # 3. Calculate Difference
         diff_oo_vs_mexc_bid1_pct = calculate_percentage_diff(
         diff_oo_vs_mexc_bid1_pct = calculate_percentage_diff(

+ 4 - 2
submit_process_demo.py

@@ -14,7 +14,8 @@ def create_mock_arbitrage_data():
     CHAIN_ID = 1
     CHAIN_ID = 1
     IN_TOKEN_ADDRESS = '0xdAC17F958D2ee523a2206206994597C13D831ec7' # USDT on Ethereum
     IN_TOKEN_ADDRESS = '0xdAC17F958D2ee523a2206206994597C13D831ec7' # USDT on Ethereum
     IN_TOKEN_DECIMALS = 6
     IN_TOKEN_DECIMALS = 6
-    IN_AMOUNT_TO_QUERY = Decimal('20')
+    EXCHANGE_OUT_AMOUNT = Decimal('1000000')
+    IN_AMOUNT_TO_QUERY = Decimal('30')
     OUT_TOKEN_ADDRESS = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860' # RATO on Ethereum
     OUT_TOKEN_ADDRESS = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860' # RATO on Ethereum
     USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4'
     USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4'
     USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
     USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
@@ -51,7 +52,8 @@ def create_mock_arbitrage_data():
         "fromTokenAmountHuman": str(human_in_base),
         "fromTokenAmountHuman": str(human_in_base),
         "fromTokenDecimal": IN_TOKEN_DECIMALS,                  
         "fromTokenDecimal": IN_TOKEN_DECIMALS,                  
         "toToken": OUT_TOKEN_ADDRESS,
         "toToken": OUT_TOKEN_ADDRESS,
-        "toTokenAmountHuman": str(human_out_target)
+        "toTokenAmountHuman": str(human_out_target),
+        "exchangeOutAmount": str(EXCHANGE_OUT_AMOUNT)
     }
     }
     return data
     return data