Browse Source

外面緩存已經寫好了,策略裏準備抓取

skyfffire 5 months ago
parent
commit
e3dae00475
3 changed files with 106 additions and 17 deletions
  1. 91 6
      as.py
  2. 13 10
      erc20_to_mexc.py
  3. 2 1
      toto.readme

+ 91 - 6
as.py

@@ -23,6 +23,7 @@ from as_utils import add_state_flow_entry
 from config import wallet
 from binance.client import Client # 用于获取ETH价格
 from checker import ok_chain_client
+from mexc_client import MexcClient
 from pprint import pprint
 from pprint import pformat
 
@@ -38,6 +39,7 @@ log.setLevel(logging.ERROR)
 
 web3 = web3_py_client.EthClient() 
 w3 = web3.w3
+mexc = MexcClient()
 
 USER_WALLET = wallet["user_wallet"]
 USER_EXCHANGE_WALLET = wallet["user_exchange_wallet"]
@@ -54,8 +56,9 @@ list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_
 # --- 一些核心數據和鎖 ---
 core_data = {
     "nonce": 0,                                     # 全局 Nonce
+    "eth_balance": Decimal(0),                      # 全局 eth餘額
+    "eth_price": 0,                                 # 全局 eth價格
     "block_number": 0,                              # 全局 區塊號
-    "eth_price": 0,                                 # eth價格
 }
 core_lock = threading.Lock()                        # 核心數據的锁
 
@@ -71,6 +74,14 @@ pending_data = {
 pending_lock = threading.Lock()
 PENDING_CONFIRM_BLOCK = 3                           # 需要幾個區塊才進行確認
 
+# --- mexc相關數據和鎖 ---
+mexc_data = {
+    "account_info": {},
+    "deposit_list": [],
+    "withdraw_list": [],
+}
+mexc_lock = threading.Lock()
+
 CHAIN_ID = -1
 try:
     if w3.provider:
@@ -201,6 +212,7 @@ def update_core_data_periodically():
     while True:
         try:
             new_eth_price = None
+            new_eth_balance = None
             new_nonce = None
             new_block_number = None
 
@@ -214,14 +226,17 @@ def update_core_data_periodically():
             else:
                 logger.warning("Binance client 未初始化, 无法获取 ETH 价格。")
 
-            # 2. 获取最新的 Nonce 和 最新的block_number
+            # 2. 获取最新的 Nonce 和 最新的block_number 以及 最新的賬戶eth餘額
             # 确保 w3 已初始化且 USER_WALLET 已配置
             if w3 and w3.is_connected() and USER_WALLET and USER_WALLET != "你的钱包地址":
                 try:
                     new_block_number = w3.eth.block_number
                     new_nonce = w3.eth.get_transaction_count(USER_WALLET, 'latest')
+                    eth_balance_origin = w3.eth.get_balance(USER_WALLET)
+                    new_eth_balance = Decimal(eth_balance_origin / (10 ** 18))
+                    new_eth_balance = new_eth_balance.quantize(Decimal('1e-6'), rounding=ROUND_DOWN)
                 except Exception as e:
-                    logger.error(f"为 {USER_WALLET} 获取 Nonce 失败: {e}")
+                    logger.error(f"为 {USER_WALLET} 获取 Nonce、BlockNumber、EthBalances 失败: {e}")
             elif not (w3 and w3.is_connected()):
                 logger.warning("Web3 未连接, 无法获取 nonce。")
             elif not (USER_WALLET and USER_WALLET != "你的钱包地址"):
@@ -242,8 +257,12 @@ def update_core_data_periodically():
                     # 區塊變了才刷新nonce,否則還是要靠本地的緩存維護
                     if new_nonce is not None and core_data["nonce"] != new_nonce:
                         core_data["nonce"] = new_nonce
+
+                    # 餘額也同理
+                    if new_eth_balance is not None and core_data["eth_balance"] != new_eth_balance:
+                        core_data["eth_balance"] = new_eth_balance
             
-            # logger.info(f"核心数据已更新: ETH Price = {core_data['eth_price']}, Nonce ({USER_WALLET}) = {core_data['nonce']}, BlockNumber = {core_data['block_number']}")
+            # logger.info(f"核心数据已更新: ETH Price = {core_data['eth_price']}, Nonce ({USER_WALLET}) = {core_data['nonce']}, EthBalance={core_data['eth_balance']}, BlockNumber = {core_data['block_number']}")
 
         except Exception as e:
             # 捕获线程循环中的其他潜在错误
@@ -253,9 +272,71 @@ def update_core_data_periodically():
         # 等待 500ms
         time.sleep(0.5)
 
+# --- mexc數據更新綫程函數 ---
+def update_mexc_data_periodically():
+    """
+    周期性更新 mexc的相關數據 的线程函数。
+    """
+    global mexc_data
+
+    while True:
+        try:
+            new_account_info = None
+            new_withdraw_list = None
+            new_deposit_list = None
+
+            # 1. new_account_info
+            try:
+                new_account_info = mexc.trade.get_account_info()
+
+                if 'balances' not in new_account_info:
+                    raise Exception("未找到balances")
+
+                with mexc_lock:
+                    mexc_data['account_info'] = new_account_info
+
+                    # logger.info(f'account_info: {new_account_info['balances']}')
+            except Exception as e:
+                logger.error(f"从 Mexc 获取 Balance 失败: {e}, {new_account_info}")
+
+            # 2. new_deposit_list
+            try:
+                new_deposit_list = mexc.wallet.get_deposit_list()
+
+                if not isinstance(new_deposit_list, list):
+                    raise Exception("充值信息獲取錯誤")
+
+                with mexc_lock:
+                    mexc_data['deposit_list'] = new_deposit_list
+
+                    # logger.info(f'deposit_list: {new_deposit_list[0]}')
+            except Exception as e:
+                logger.error(f"从 Mexc 获取 deposit_list 失败: {e}, {new_deposit_list}")
+
+            # 3. new_withdraw_list
+            try:
+                new_withdraw_list = mexc.wallet.get_withdraw_list()
+
+                if not isinstance(new_withdraw_list, list):
+                    raise Exception("提現信息獲取錯誤")
+
+                with mexc_lock:
+                    mexc_data['withdraw_list'] = new_withdraw_list
+
+                    # logger.info(f'withdraw_list: {new_withdraw_list[0]}')
+            except Exception as e:
+                logger.error(f"从 Mexc 获取 withdraw_list 失败: {e}, {new_withdraw_list}")
+
+        except Exception as e:
+            # 捕获线程循环中的其他潜在错误
+            logger.error(f"数据更新线程发生未知错误: {e}")
+            traceback.print_exc()
+        
+        # 等待 1s
+        time.sleep(1)
 
 # --- tx pending數據獲取綫程函數 ---
-def update_tx_data():
+def update_tx_data_periodically():
     """
     每一秒獲取一條tx數據
     """
@@ -441,8 +522,12 @@ if __name__ == "__main__":
     updater_thread = threading.Thread(target=update_core_data_periodically, daemon=True)
     updater_thread.start()
 
+    logger.info("启动抹茶数据更新线程...")
+    updater_thread = threading.Thread(target=update_mexc_data_periodically, daemon=True)
+    updater_thread.start()
+
     logger.info("启动pending信息獲取线程...")
-    pending_thread = threading.Thread(target=update_tx_data, daemon=True)
+    pending_thread = threading.Thread(target=update_tx_data_periodically, daemon=True)
     pending_thread.start()
 
     logger.info("主线程继续执行,可以执行其他任务或保持运行以观察数据更新。")

+ 13 - 10
erc20_to_mexc.py

@@ -237,16 +237,19 @@ class ArbitrageProcess:
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
             # step4, 與賬戶eth餘額比對(至少留0.001,不然沒gas了)
-            MARGIN = 0.001
-            eth_balance_origin = web3.w3.eth.get_balance(self.user_wallet)
-            eth_balance = Decimal(eth_balance_origin / (10 ** 18))
-            eth_balance = eth_balance.quantize(Decimal('1e-6'), rounding=ROUND_DOWN)
-            if eth_balance - estimated_eth < MARGIN:
-                msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
-                logger.info(msg)
-                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
-                self._set_state(self.STATE_REJECT)
-                return
+            MARGIN = Decimal(0.001)
+            # 暫時鎖住core_data
+            with self.core_lock:
+                eth_balance = self.core_data['eth_balance']
+                if eth_balance - estimated_eth < MARGIN:
+                    msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
+                    logger.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                    self._set_state(self.STATE_REJECT)
+                    return
+                
+                # 餘額判斷通過后預扣除balance,防止綫程更新不及時導致其他綫程誤發送tx
+                self.core_data['eth_balance'] = self.core_data['eth_balance'] - estimated_eth
             msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
             logger.info(msg)
             add_state_flow_entry(self.process_item, self.current_state, msg, "success")

+ 2 - 1
toto.readme

@@ -12,7 +12,8 @@
 [-] 鏈上買入成功,但是okx返回的空狀態,詳見2025-6-5學費
 [-] 解密HASH進行鑒權
 [-] json用pprint美化后輸出
-[ ] 查询交易所到账状态时发生错误:'balances'
+[-] ETH餘額緩存
+[ ] 交易所數據緩存
 
 2025-06-07
 [ ] 做另一個方向之前,需要先整理策略層架構,當前架構如何兼容多策略