Quellcode durchsuchen

各種核心數據的更新,對交易略微進行加速。

skyfffire vor 5 Monaten
Ursprung
Commit
4a63b7dfe5

+ 105 - 22
as.py

@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 
-from decimal import Decimal
+from decimal import Decimal, ROUND_DOWN
 import threading
 import uuid # 用于生成唯一的流程ID
 import time
@@ -13,6 +13,7 @@ from flask_cors import CORS # 导入
 from as_utils import get_formatted_timestamp
 from as_utils import add_state_flow_entry
 from config import wallet
+from binance.client import Client # 用于获取ETH价格
 
 # 配置日志
 log = logging.getLogger('werkzeug')
@@ -20,38 +21,47 @@ log.setLevel(logging.ERROR)
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
 web3 = web3_py_client.EthClient() 
+w3 = web3.w3
 
 USER_WALLET = wallet["user_wallet"]
 USER_EXCHANGE_WALLET = wallet["user_exchange_wallet"]
 
 # 该代币最后一次执行套利的区块信息 (如果需要防止过于频繁的同类套利,不然变成砸盘、拉盘的了)
 last_process_info = {} # 示例: {"RATO_USDT": 0}
-MIN_BLOCKS_BETWEEN_ARB = Decimal(5) # 在重试相同交易对之前等待几个区块
+MIN_BLOCKS_BETWEEN_ARB = Decimal(2) # 在重试相同交易对之前等待几个区块
 
 # --- 全局状态和锁 ---
 processing_list = [] # 正在处理的任务列表
 history_process_list = [] # 已完成的任务历史列表
 list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_list 结构的锁
 
-# USER_WALLET 的 Nonce 管理,前提是此服务器为该钱包发起交易。
-# 如果传入的 'tx' 已预签名并包含 nonce,则此全局 nonce 对该特定 tx 不太重要。
-# 但如果此服务器要从 USER_WALLET 发起*其他*交易(例如代币批准),则此 nonce 很重要。
-global_nonce_USER_WALLET = 0 # 从 USER_WALLET 发送交易的全局 Nonce
-nonce_lock_USER_WALLET = threading.Lock() # USER_WALLET Nonce 的锁
+
+core_data = {
+    "nonce": 0,                                     # 全局 Nonce
+    "block_number": 0,                              # 全局 區塊號
+    "eth_price": 0,                                 # eth價格
+}
+core_lock = threading.Lock()                        # 核心數據的锁
 
 try:
-    if web3.w3.provider: # 检查 web3 是否已某种程度初始化
-        # 这个全局 nonce 应该小心初始化。
-        # 如果 price_checker 发送交易,它应该管理 USER_WALLET 的 tx 的 nonce。
-        # 这个服务器的 global_nonce 是针对它自己可能创建的 tx。
-        # 暂时假设传入的 TX 具有其 nonce 或 price_checker 处理了它。
-        global_nonce_USER_WALLET = web3.w3.eth.get_transaction_count(USER_WALLET, 'latest')
-        logging.info(f"如果服务器要创建交易,{USER_WALLET} 的初始 nonce 将在此处获取。")
+    if w3.provider:
+        # global_nonce_USER_WALLET = w3.eth.get_transaction_count(USER_WALLET, 'latest')
+        logging.info("Web3 已连接。")
     else:
-        logging.info("Web3 提供者未连接, USER_WALLET 的全局 nonce 未初始化。")
+        logging.info("Web3 未连接。")
 except Exception as e:
     logging.info(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
 
+# Binance 客户端 (无需API Key/Secret即可获取公开行情数据)
+try:
+    binance_client = Client()
+    # 测试连接 (可选,但建议)
+    binance_client.ping()
+    logging.info("成功连接到 Binance API。")
+except Exception as e:
+    logging.error(f"初始化 Binance Client 时发生错误: {e}")
+    binance_client = None
+
 # --- Flask 应用 ---
 app = Flask(__name__)
 CORS(app) # 在创建 app 实例后启用 CORS
@@ -94,6 +104,7 @@ def move_completed_process_to_history(process_id_to_move: str) -> bool:
             
     return moved_successfully
 
+# 實際套利邏輯
 def arbitrage_process_flow(process_item):
     """
     在单独线程中执行的实际套利逻辑。
@@ -114,6 +125,7 @@ def arbitrage_process_flow(process_item):
     EXCHANGE_OUT_AMOUNT = process_item['exchangeOutAmount']
 
 
+    gas_limit_multiplier = 1
     gas_price_multiplier = 1
     if profit > Decimal(2) * profitLimit:
         gas_price_multiplier = 1.2
@@ -121,9 +133,10 @@ def arbitrage_process_flow(process_item):
         gas_price_multiplier = 1.5
     elif profit > Decimal(10) * profitLimit:
         gas_price_multiplier = 2
-    gas_limit_multiplier = 1.2
 
-    ap = erc20_to_mexc_first_sell.ArbitrageProcess(tx, gas_limit_multiplier, gas_price_multiplier, process_item)
+    global core_data
+    global core_lock
+    ap = erc20_to_mexc_first_sell.ArbitrageProcess(tx, gas_limit_multiplier, gas_price_multiplier, process_item, core_data, core_lock)
 
     # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
     ap._set_state(ap.STATE_CHECK)
@@ -139,6 +152,71 @@ def arbitrage_process_flow(process_item):
 
     move_completed_process_to_history(process_id)
     
+# --- 核心數據更新綫程函數 ---
+def update_core_data_periodically():
+    """
+    周期性更新 nonce 和 ETH 价格的线程函数。
+    """
+    global core_data # 明确表示我们要修改全局的 core_data
+
+    if not USER_WALLET or USER_WALLET == "你的钱包地址":
+        logging.error("USER_WALLET 未正确配置。nonce 更新将无法进行。")
+        # 如果 USER_WALLET 未配置,可以考虑让线程不执行 nonce 更新,或者直接退出
+        # 这里我们选择继续运行,但 nonce 不会被更新
+
+    while True:
+        try:
+            new_eth_price = None
+            new_nonce = None
+            new_block_number = None
+
+            # 1. 从 Binance 获取 ETH 价格
+            if binance_client:
+                try:
+                    ticker = binance_client.get_symbol_ticker(symbol="ETHUSDT")
+                    new_eth_price = float(ticker['price'])
+                except Exception as e:
+                    logging.error(f"从 Binance 获取 ETH 价格失败: {e}")
+            else:
+                logging.warning("Binance client 未初始化, 无法获取 ETH 价格。")
+
+            # 2. 获取最新的 Nonce 和 最新的block_number
+            # 确保 w3 已初始化且 USER_WALLET 已配置
+            if w3 and w3.is_connected() and USER_WALLET and USER_WALLET != "你的钱包地址":
+                try:
+                    new_block_number = w3.eth.block_number
+                    new_nonce = w3.eth.get_transaction_count(USER_WALLET, 'latest')
+                except Exception as e:
+                    logging.error(f"为 {USER_WALLET} 获取 Nonce 失败: {e}")
+            elif not (w3 and w3.is_connected()):
+                logging.warning("Web3 未连接, 无法获取 nonce。")
+            elif not (USER_WALLET and USER_WALLET != "你的钱包地址"):
+                logging.warning("USER_WALLET 配置不正确, 无法获取 nonce。")
+
+            # 3. 更新共享数据 core_data (使用锁)
+            # 只有当获取到新数据时才更新,避免不必要的写操作和日志
+            with core_lock:
+                if new_eth_price is not None and core_data["eth_price"] != new_eth_price:
+                    eth_price = Decimal(new_eth_price)
+                    eth_price = eth_price.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
+                    core_data["eth_price"] = eth_price
+                
+                # 判斷block_number是否發生變化(升高)
+                if new_block_number is not None and new_block_number > core_data["block_number"]:
+                    core_data["block_number"] = new_block_number
+                
+                    # 區塊變了才刷新nonce,否則還是要靠本地的緩存維護
+                    if new_nonce is not None and core_data["nonce"] != new_nonce:
+                        core_data["nonce"] = new_nonce
+            
+            logging.info(f"核心数据已更新: ETH Price = {core_data['eth_price']}, Nonce ({USER_WALLET}) = {core_data['nonce']}, BlockNumber = {core_data['block_number']}")
+
+        except Exception as e:
+            # 捕获线程循环中的其他潜在错误
+            logging.error(f"数据更新线程发生未知错误: {e}")
+        
+        # 等待 500ms
+        time.sleep(0.5)
 
 @app.route('/submit_process', methods=['POST'])
 def handle_submit_process():
@@ -165,10 +243,11 @@ def handle_submit_process():
 
     # 检查此交易对此区块是否处理过
     last_trade_block = last_process_info.get(symbol)
-    current_block = web3.w3.eth.block_number
-    if last_trade_block:
-        if current_block - last_trade_block < MIN_BLOCKS_BETWEEN_ARB:
-            return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_trade_block}). 当前区块 {current_block}."}), 200
+    with core_lock:
+        current_block = core_data['block_number']
+        if last_trade_block:
+            if current_block - last_trade_block < MIN_BLOCKS_BETWEEN_ARB:
+                return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_trade_block}). 当前区块 {current_block}."}), 200
 
     if profit >= profit_limit:
         process_id = str(uuid.uuid4()) # 生成唯一流程ID
@@ -232,6 +311,10 @@ def get_status():
         })
 
 if __name__ == "__main__":
-    # 如果此服务器为其自身的交易管理 global_nonce_USER_WALLET,则在此处初始化
+    logging.info("启动核心数据更新线程...")
+    updater_thread = threading.Thread(target=update_core_data_periodically, daemon=True)
+    updater_thread.start()
+    logging.info("主线程继续执行,可以执行其他任务或保持运行以观察数据更新。")
+
     logging.info("启动 Flask 套利执行服务器...")
     app.run(host='0.0.0.0', port=188, debug=False) # 使用与 price_checker 不同的端口

+ 0 - 0
signal/__init__.py → checker/__init__.py


+ 0 - 0
signal/config.py.sample → checker/config.py.sample


+ 0 - 0
signal/ok_chain_client.py → checker/ok_chain_client.py


+ 0 - 0
signal/price_checker.py → checker/price_checker.py


+ 0 - 0
signal/submit_process_demo.py → checker/submit_process_demo.py


+ 0 - 0
signal/templates/index_plotly_dynamic_ok.html → checker/templates/index_plotly_dynamic_ok.html


+ 27 - 5
erc20_to_mexc_first_sell.py

@@ -12,7 +12,7 @@ mexc = MexcClient()
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
 class ArbitrageProcess:
-    def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier, process_item):
+    def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier, process_item, core_data, core_lock):
         """
         初始化套利流程
 
@@ -21,13 +21,25 @@ class ArbitrageProcess:
             gas_limit_multiplier: gas limit倍数, 一般都不加倍
             gas_price_multiplier: gas price倍数, 可以提高交易成功率
         """
+        self.WITHDRAW_FEE = Decimal(0.5)                        # 提現手續費
+
         tx.pop('gasPrice', None)
         tx.pop('value', None)
         tx.pop('minReceiveAmount', None)
         tx.pop('slippage', None)
         tx.pop('maxSpendAmount', None)
         tx.pop('signatureData', None)
+
+        self.core_data = core_data
+        self.core_lock = core_lock
+
+        with self.core_lock:
+            self.eth_price = self.core_data['eth_price']
+
         self.tx = tx
+        
+        self.profit = Decimal(process_item['profit'])               # 這個利潤是實際到手利潤
+        self.profit_limit = Decimal(process_item['profitLimit'])    # 這個利潤是實際到手利潤的limit
 
         self.gas_limit_multiplier = gas_limit_multiplier
         self.gas_price_multiplier = gas_price_multiplier
@@ -197,14 +209,15 @@ class ArbitrageProcess:
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
             # step3, 費用與利潤比較
-            LIMIT = 0.005
-            if estimated_eth > LIMIT:
-                msg = f"TODO: 費用判斷不通過,費用還是固定值,建議改成與ETH價格挂鈎的值,方便設定利潤!LIMIT:{LIMIT}, estimated_eth: {estimated_eth}"
+            estimated_eth_value = estimated_eth * self.eth_price
+            cost = estimated_eth_value + self.WITHDRAW_FEE          # 成本
+            if self.profit - cost > self.profit_limit:
+                msg = f"費用判斷不通過! eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}, profit: {self.profit}, profit_limit: {self.profit_limit}"
                 logging.info(msg)
                 add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
                 self._set_state(self.STATE_REJECT)
                 return
-            msg = f"TODO: 費用判斷通過,費用還是固定值,建議改成與ETH價格挂鈎的值,方便設定利潤!LIMIT:{LIMIT}, estimated_eth: {estimated_eth}"
+            msg = f"費用判斷通過! eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}, profit: {self.profit}, profit_limit: {self.profit_limit}"
             logging.info(msg)
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
@@ -332,12 +345,21 @@ class ArbitrageProcess:
         add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:                
             self.chain_amount_before_trade = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
+
+            # 交易前nonce
+            with self.core_lock:
+                self.tx['nonce'] = self.core_data['nonce']
+            
             # 调用链上客户端执行买入交易
             self.chain_buy_tx_hash = web3._sign_and_send_transaction(
                 self.tx,
                 self.gas_limit_multiplier
             )
 
+            # 交易成功后刷新全局nonce
+            with self.core_lock:
+                self.core_data['nonce'] = self.core_data['nonce'] + 1
+
             # 交易成功
             msg = f"链上买入交易已发送,交易哈希:{self.chain_buy_tx_hash}"
             logging.info(msg)