Jelajahi Sumber

已经连起来测试了,加入了套保余额检测。

skyfffire 5 bulan lalu
induk
melakukan
4def117169
4 mengubah file dengan 129 tambahan dan 56 penghapusan
  1. 68 46
      arbitrage_process.py
  2. 50 6
      arbitrage_system.py
  3. 1 1
      price_checker_ok.py
  4. 10 3
      submit_process_demo.py

+ 68 - 46
arbitrage_process.py

@@ -3,7 +3,7 @@ import logging
 import datetime
 from web3_py_client import EthClient
 from mexc_client import MexcClient
-from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN
+from decimal import Decimal, ROUND_DOWN
 
 web3 = EthClient()
 mexc = MexcClient()
@@ -40,7 +40,7 @@ def add_state_flow_entry(process_item, state_name, msg, status_val="pending"):
 
 class ArbitrageProcess:
     def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier, 
-                 from_token, to_token, from_token_amount_human, 
+                 from_token, to_token, from_token_amount_human, out_token_amount_human,
                  user_exchange_wallet, user_wallet,
                  symbol, process_item):
         """
@@ -71,13 +71,14 @@ class ArbitrageProcess:
         self.process_item = process_item
 
         # 存储当前套利交易的细节信息,例如买入数量、价格等
-        chain_usdt_use = decimal.Decimal(from_token_amount_human)
+        chain_usdt_use = Decimal(from_token_amount_human)
         self.arbitrage_details = {
             "chain_buy_tx_hash": None,                  # 链上买入的tx hash
             "chain_usdt_use": chain_usdt_use,           # 链上usdt减少量(使用量), todo, 暂用固定值代替
+            "out_token_amount_human": out_token_amount_human, # 链上可能会产生的out数量,交易所的代币剩余量一定要大于这个值
             "chain_amount_before_trade": 0,
             "chain_amount_after_trade": 0,
-            "chain_buy_amount": decimal.Decimal('0'),   # 链上币增加量(购入量), todo, 暂用即时余额代替
+            "chain_buy_amount": Decimal('0'),   # 链上币增加量(购入量), todo, 暂用即时余额代替
             "chain_buy_price": None,                    # 链上购入价, todo
             "chain_withdrawal_tx_hash": None,           # 链上转入交易所的tx
             "exchange_sell_order_id": None,             # 交易所卖出id
@@ -173,6 +174,21 @@ class ArbitrageProcess:
         logging.info(msg)
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
+            balances = mexc.trade.get_account_info()['balances']
+            for balance in balances:
+                if balance['asset'] == self.coin:
+                    if Decimal(balance['free']) < Decimal(self.arbitrage_details['out_token_amount_human']):
+                        msg = f"交易所剩余{self.coin}: {balance['free']}, 链上预计购入:{self.arbitrage_details['out_token_amount_human']}, 不能触发套保交易。"
+                        logging.info(msg)
+                        add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                        self._set_state(self.STATE_FAILED)
+                        return
+                    else:
+                        msg = f"交易所剩余{self.coin}: {balance['free']}, 链上预计购入:{self.arbitrage_details['out_token_amount_human']}, 余额校验通过(可以套保)。"
+                        logging.info(msg)
+                        add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+                        break
+                
             self.arbitrage_details["chain_amount_before_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
             # 调用链上客户端执行买入交易
             chain_buy_tx_hash = web3._sign_and_send_transaction(
@@ -213,12 +229,12 @@ class ArbitrageProcess:
                 self.arbitrage_details["chain_amount_after_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
                 actual_buy_amount = self.arbitrage_details["chain_amount_after_trade"] - self.arbitrage_details["chain_amount_before_trade"]
 
-                buy_amount_human = actual_buy_amount.quantize(decimal.Decimal('1e-2'), rounding=ROUND_DOWN)
+                buy_amount_human = actual_buy_amount.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
                 sell_amount_human = self.arbitrage_details["chain_usdt_use"]
                 self.arbitrage_details["chain_buy_amount"] = buy_amount_human # 存储实际买入数量
 
                 price_human = sell_amount_human / buy_amount_human
-                price_human = price_human.quantize(decimal.Decimal('1e-8'), rounding=ROUND_DOWN)
+                price_human = price_human.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
 
                 msg = f"链上交易已确认。用{sell_amount_human}买入{buy_amount_human},价格{price_human}。"
                 logging.info(msg)
@@ -245,9 +261,9 @@ class ArbitrageProcess:
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
             pseudo_amount_to_sell = self.arbitrage_details["chain_buy_amount"]
-            # pseudo_amount_to_sell = decimal.Decimal('200000')
+            # pseudo_amount_to_sell = Decimal('200000')
             # 处理精度
-            pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(decimal.Decimal('1'), rounding=ROUND_DOWN)
+            pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
             
             order_params = {
                 "symbol": self.symbol.replace('_', ''),
@@ -297,6 +313,8 @@ class ArbitrageProcess:
 
                 if order['status'] == "FILLED":
                     msg = f"交易所现货卖出订单已完全成交。{order}"
+                    self.arbitrage_details["exchange_withdraw_amount"] = order['cummulativeQuoteQty']
+
                     logging.info(msg)
                     add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
@@ -373,38 +391,33 @@ class ArbitrageProcess:
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
 
         try:
-            times = 10
-            while times > 0:
-                balances = mexc.trade.get_account_info()['balances']
-                for balance in balances:
-                    if balance['asset'] == 'USDT':
-                        pseudo_withdraw_amount = str(int(float(balance['free'])))
-
-                        withdraw_params = {
-                            'coin': 'USDT',
-                            'netWork': 'ETH',
-                            'address': self.user_wallet,
-                            'amount': pseudo_withdraw_amount
-                        }
-                        withdraw_rst = mexc.wallet.post_withdraw(withdraw_params)
-                        if "id" not in withdraw_rst:
-                            logging.error(f"提现失败, 10秒后重试{times}/10")
-                            logging.error(withdraw_params)
-                            logging.error(withdraw_rst)
-
-                        exchange_withdrawal_id = withdraw_rst["id"]
-
-                        msg = f"交易所提现已发送, 提现ID: {exchange_withdrawal_id}"
-                        logging.info(msg)
-                        add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+            # times = 10
+            # while times > 0:
+                # balances = mexc.trade.get_account_info()['balances']
+                # for balance in balances:
+                #     if balance['asset'] == 'USDT':
+            pseudo_withdraw_amount = str(int(float(self.arbitrage_details["exchange_withdraw_amount"])))
+
+            withdraw_params = {
+                'coin': 'USDT',
+                'netWork': 'ETH',
+                'address': self.user_wallet,
+                'amount': pseudo_withdraw_amount
+            }
+            withdraw_rst = mexc.wallet.post_withdraw(withdraw_params)
+            if "id" not in withdraw_rst:
+                logging.error(f"提现失败")
+                logging.error(withdraw_params)
+                logging.error(withdraw_rst)
 
-                        self.arbitrage_details["exchange_withdrawl_id"] = withdraw_rst["id"]
-                        self._set_state(self.STATE_WAITING_WITHDRAWAL_CONFIRM)
+            exchange_withdrawal_id = withdraw_rst["id"]
 
-                        return
-            
-                times = times - 1
-                time.sleep(10)
+            msg = f"交易所提现已发送, 提现ID: {exchange_withdrawal_id}"
+            logging.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+            self.arbitrage_details["exchange_withdrawl_id"] = withdraw_rst["id"]
+            self._set_state(self.STATE_WAITING_WITHDRAWAL_CONFIRM)
 
         except Exception as e:
             msg = f"转账回链上失败:{e}"
@@ -479,7 +492,7 @@ if __name__ == "__main__":
 
     CHAIN_ID = 1
     FROM_TOKEN = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
-    FROM_TOKEN_AMOUNT_HUMAM = decimal.Decimal('20')
+    FROM_TOKEN_AMOUNT_HUMAM = Decimal('20')
     FROM_TOKEN_DECIMAL = 6
     TO_TOKEN = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860'
     USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4'
@@ -503,32 +516,41 @@ if __name__ == "__main__":
             raise Exception("") 
     d = data['data'][0]
     tx = d['tx']
+    router_result = d['routerResult']
+    in_dec, out_dec = int(router_result['fromToken']['decimal']), int(router_result['toToken']['decimal'])
+    atomic_in_base, atomic_out_target = Decimal(router_result['fromTokenAmount']), Decimal(router_result['toTokenAmount'])
+    human_in_base = atomic_in_base / (10 ** in_dec)
+    human_out_target = atomic_out_target / (10 ** out_dec)
+
+    FROM_TOKEN_AMOUNT_HUMAM = human_in_base
+    TO_TOKEN_AMOUNT_HUMAM = human_out_target
+
     pprint.pprint(tx)
 
     # 套利流程执行
     process_item = {
         "stateFlow": [], # 状态流转记录
     }
-    arbitrage_process = ArbitrageProcess(tx, 2, 1.2, 
+    ap = ArbitrageProcess(tx, 2, 1.2, 
                                         FROM_TOKEN, TO_TOKEN, 
-                                        FROM_TOKEN_AMOUNT_HUMAM, 
+                                        FROM_TOKEN_AMOUNT_HUMAM, TO_TOKEN_AMOUNT_HUMAM,
                                         USER_EXCHANGE_WALLET, USER_WALLET,
                                         SYMBOL, process_item)
 
     # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
-    arbitrage_process._set_state(arbitrage_process.STATE_BUYING_ON_CHAIN)
+    ap._set_state(ap.STATE_BUYING_ON_CHAIN)
 
     # 在主循环中周期性调用 run_arbitrage_step
-    while arbitrage_process.current_state != "COMPLETED" and arbitrage_process.current_state != "FAILED":
-        arbitrage_process.run_arbitrage_step()
+    while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
+        ap.run_arbitrage_step()
 
-        if arbitrage_process.current_state == arbitrage_process.STATE_WAITING_TRANSFER_ARRIVE or arbitrage_process.current_state == arbitrage_process.STATE_WAITING_WITHDRAWAL_CONFIRM:
+        if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
             time.sleep(10)
         else:
             time.sleep(1)
 
     logging.info(process_item)
-    if arbitrage_process.current_state == "COMPLETED":
+    if ap.current_state == "COMPLETED":
         logging.info("套利流程执行成功!")
     else:
         logging.info("套利流程执行失败!")

+ 50 - 6
arbitrage_system.py

@@ -1,7 +1,7 @@
 import decimal
 import threading
 import uuid # 用于生成唯一的流程ID
-import datetime
+import time
 import logging
 import arbitrage_process
 
@@ -55,10 +55,50 @@ def arbitrage_process_flow(process_item):
     在单独线程中执行的实际套利逻辑。
     会直接修改 'process_item' 字典。
     """
-    process_id = process_item['id']
-    symbol = process_item['symbol']
-    onchain_tx_details = process_item['tx'] # 预期包含 'rawTransaction' (原始交易)
-    
+    # process_id = process_item['id']
+    SYMBOL = process_item['symbol']
+    tx = process_item['tx'] # 预期包含 'rawTransaction' (原始交易)
+    FROM_TOKEN = process_item['fromToken']
+    TO_TOKEN = process_item['toToken']
+    FROM_TOKEN_AMOUNT_HUMAM = process_item['fromTokenAmountHuman']
+    TO_TOKEN_AMOUNT_HUMAM = process_item['toTokenAmountHuman']
+    profit = float(process_item['profit'])
+    USER_EXCHANGE_WALLET = process_item['userExchangeWallet']
+    USER_WALLET = process_item['userWallet']
+    SYMBOL = process_item['symbol']
+
+
+    gas_price_multiplier = 1
+    if profit > 2:
+        gas_price_multiplier = 1.2
+    elif profit > 5:
+        gas_price_multiplier = 2
+    elif profit > 10:
+        gas_price_multiplier = 3
+
+    ap = arbitrage_process.ArbitrageProcess(tx, 2, gas_price_multiplier, 
+                                                           FROM_TOKEN, TO_TOKEN, 
+                                                           FROM_TOKEN_AMOUNT_HUMAM, TO_TOKEN_AMOUNT_HUMAM,
+                                                           USER_EXCHANGE_WALLET, USER_WALLET,
+                                                           SYMBOL, process_item)
+
+    # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
+    ap._set_state(ap.STATE_BUYING_ON_CHAIN)
+
+    # 在主循环中周期性调用 run_arbitrage_step
+    while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
+        ap.run_arbitrage_step()
+
+        if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
+            time.sleep(10)
+        else:
+            time.sleep(1)
+
+    logging.info(process_item)
+    if ap.current_state == "COMPLETED":
+        logging.info("套利流程执行成功!")
+    else:
+        logging.info("套利流程执行失败!")
     
 
 @app.route('/submit_process', methods=['POST'])
@@ -67,7 +107,7 @@ def handle_submit_process():
     if not data:
         return jsonify({"error": "无效的 JSON 请求体"}), 400
 
-    required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'fromToken', 'fromTokenAmountHuman', 'fromTokenDecimal', 'toToken']
+    required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'fromToken', 'fromTokenAmountHuman', 'fromTokenDecimal', 'toToken', 'toTokenAmountHuman']
     for field in required_fields:
         if field not in data:
             return jsonify({"error": f"缺少字段: {field}"}), 400
@@ -77,6 +117,7 @@ def handle_submit_process():
         profit_limit = decimal.Decimal(str(data['profitLimit']))                        # 利润阈值
         from_token_amount_human = decimal.Decimal(str(data['fromTokenAmountHuman']))    # fromToken 的人类可读数量
         from_token_decimal = decimal.Decimal(str(data['fromTokenDecimal']))             # fromToken 的小数位数
+        to_token_amount_human = decimal.Decimal(str(data['toTokenAmountHuman']))        # toToken 的人类可读数量
     except (decimal.InvalidOperation, ValueError) as e:
         return jsonify({"error": f"请求体中包含无效的小数/整数值: {e}"}), 400
 
@@ -98,9 +139,12 @@ def handle_submit_process():
             "profit": str(profit), # 利润 (字符串存储)
             "profitLimit": str(profit_limit), # 利润阈值 (字符串存储)
             "symbol": symbol, # 交易对
+            "userWallet": USER_WALLET,
+            "userExchangeWallet": USER_EXCHANGE_WALLET,
             "fromToken": data['fromToken'], # 起始代币
             "fromTokenAmountHuman": str(from_token_amount_human), # 起始代币数量 (人类可读, 字符串存储)
             "fromTokenDecimal": from_token_decimal, # 起始代币小数位数
+            "toTokenAmountHuman": str(to_token_amount_human),
             "toToken": data['toToken'], # 目标代币
             "stateFlow": [], # 状态流转记录
             "currentState": "PENDING_START", # 当前状态

+ 1 - 1
price_checker_ok.py

@@ -18,7 +18,7 @@ IN_TOKEN_ADDRESS = '0xdAC17F958D2ee523a2206206994597C13D831ec7' # USDT on Ethere
 IN_TOKEN_DECIMALS = 6
 IN_AMOUNT_TO_QUERY = decimal.Decimal('20')
 OUT_TOKEN_ADDRESS = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860' # RATO on Ethereum
-USER_WALLET = '0xb1f33026db86a86372493a3b124d7123e9045bb4'
+USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4'
 USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
 SLIPPAGE = 1
 MEXC_TARGET_PAIR_USDT = 'RATO_USDT' # MEXC 现货交易对

+ 10 - 3
submit_process_demo.py

@@ -16,7 +16,7 @@ def create_mock_arbitrage_data():
     IN_TOKEN_DECIMALS = 6
     IN_AMOUNT_TO_QUERY = Decimal('20')
     OUT_TOKEN_ADDRESS = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860' # RATO on Ethereum
-    USER_WALLET = '0xb1f33026db86a86372493a3b124d7123e9045bb4'
+    USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4'
     USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
     SLIPPAGE = 1
     MEXC_TARGET_PAIR_USDT = 'RATO_USDT' # MEXC 现货交易对
@@ -33,6 +33,12 @@ def create_mock_arbitrage_data():
 
     d = data['data'][0]
     tx = d['tx']
+    router_result = d['routerResult']
+    in_dec, out_dec = int(router_result['fromToken']['decimal']), int(router_result['toToken']['decimal'])
+    atomic_in_base, atomic_out_target = Decimal(router_result['fromTokenAmount']), Decimal(router_result['toTokenAmount'])
+    human_in_base = atomic_in_base / (10 ** in_dec)
+    human_out_target = atomic_out_target / (10 ** out_dec)
+
     # 构造提交给 arb_executor 的数据体
     data = {
         "tx": tx,
@@ -40,9 +46,10 @@ def create_mock_arbitrage_data():
         "profitLimit": str(0.01),
         "symbol": MEXC_TARGET_PAIR_USDT,
         "fromToken": IN_TOKEN_ADDRESS,
-        "fromTokenAmountHuman": str(IN_AMOUNT_TO_QUERY),
+        "fromTokenAmountHuman": str(human_in_base),
         "fromTokenDecimal": IN_TOKEN_DECIMALS,                  
-        "toToken": OUT_TOKEN_ADDRESS
+        "toToken": OUT_TOKEN_ADDRESS,
+        "toTokenAmountHuman": str(human_out_target)
     }
     return data