Browse Source

stateFlow的处理。

skyfffire 5 months ago
parent
commit
091de0dcb8
2 changed files with 129 additions and 102 deletions
  1. 123 43
      arbitrage_process.py
  2. 6 59
      arbitrage_system.py

+ 123 - 43
arbitrage_process.py

@@ -1,5 +1,6 @@
 import time
 import logging
+import datetime
 from web3_py_client import EthClient
 from mexc_client import MexcClient
 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN
@@ -11,21 +12,37 @@ mexc = MexcClient()
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
 
-def truncate_float_string(number, digits):
-  """
-  硬截取浮点数,保留指定小数位数 (使用字符串格式化辅助)
-  """
-  stepper = 10.0 ** digits
-  # 将数乘以 stepper,转换为整数进行截断,然后除回 stepper
-  truncated_number = int(number * stepper) / stepper
-  return truncated_number
-
+def get_formatted_timestamp():
+    """
+    获取指定格式的时间戳: YYYY-MM-DD HH:MM:SS,ms
+    例如: 2025-05-16 14:44:09,324
+    """
+    now = datetime.datetime.now()
+    # 格式化日期和时间部分
+    timestamp_str = now.strftime("%Y-%m-%d %H:%M:%S")
+    # 获取毫秒部分,并格式化为3位数字
+    milliseconds = now.microsecond // 1000
+    milliseconds_str = f"{milliseconds:03d}"
+    # 组合最终格式
+    return f"{timestamp_str},{milliseconds_str}"
+
+def add_state_flow_entry(process_item, state_name, msg, status_val="pending"):
+    """辅助函数,用于向 stateFlow 列表添加条目。"""
+    entry = {
+        "stateName": state_name, # 状态名称
+        "timestamp": get_formatted_timestamp(), # 时间戳
+        "msg": msg, # 消息
+        "status": status_val # 状态值: "pending", "success", "fail", "skipped"
+    }
+    process_item["stateFlow"].append(entry)
+    process_item["currentState"] = state_name # 更新整体状态
+    # logging.info(f"[流程 {process_item.get('id', 'N/A')}][{state_name}]: {msg} (状态: {status_val})")
 
 class ArbitrageProcess:
     def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier, 
                  from_token, to_token, from_token_amount_human, 
                  user_exchange_wallet, user_wallet,
-                 symbol):
+                 symbol, process_item):
         """
         初始化套利流程
 
@@ -51,6 +68,7 @@ class ArbitrageProcess:
         self.symbol = symbol
         self.coin = symbol.split('_')[0]
         self.base_coin = symbol.split('_')[1]
+        self.process_item = process_item
 
         # 存储当前套利交易的细节信息,例如买入数量、价格等
         chain_usdt_use = decimal.Decimal(from_token_amount_human)
@@ -151,7 +169,9 @@ class ArbitrageProcess:
         """
         在链上执行买入操作
         """
-        logging.info("执行:链上买入操作...")
+        msg = "执行:链上买入操作..."
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
             self.arbitrage_details["chain_amount_before_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
             # 调用链上客户端执行买入交易
@@ -162,12 +182,16 @@ class ArbitrageProcess:
             )
 
             # 交易成功
-            logging.info(f"链上买入交易已发送,交易哈希:{chain_buy_tx_hash}")
+            msg = f"链上买入交易已发送,交易哈希:{chain_buy_tx_hash}"
+            logging.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
             self.arbitrage_details["chain_buy_tx_hash"] = chain_buy_tx_hash
             self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
 
         except Exception as e:
-            logging.error(f"链上买入失败:{e}")
+            msg = f"链上买入失败:{e}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
             self._set_state("FAILED")
 
     def _wait_chain_confirm(self):
@@ -176,13 +200,14 @@ class ArbitrageProcess:
         """
 
         hash = self.arbitrage_details["chain_buy_tx_hash"]
-        logging.info(f"等待链上交易确认:{hash}")
+        msg = f"等待链上交易确认:{hash}"
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
             # 查询链上交易确认状态
             receipt = web3.wait_for_transaction_receipt(hash, timeout=300)
 
             if receipt.status == 1:
-                logging.info("链上交易已确认。")
                 # 在这里根据实际链上交易结果更新实际买入数量,用于后续流程
                 # 这里要用确认后数量减去确认前数量,才知道具体买入了多少
                 self.arbitrage_details["chain_amount_after_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
@@ -195,21 +220,29 @@ class ArbitrageProcess:
                 price_human = sell_amount_human / buy_amount_human
                 price_human = price_human.quantize(decimal.Decimal('1e-8'), rounding=ROUND_DOWN)
 
-                logging.info(f"用{sell_amount_human}买入{buy_amount_human},价格{price_human}。")
+                msg = f"链上交易已确认。用{sell_amount_human}买入{buy_amount_human},价格{price_human}。"
+                logging.info(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "success")
                 self._set_state(self.STATE_SELLING_ON_EXCHANGE)
             else:
-                 logging.error(f"链上交易确认失败:{hash}")
-                 self._set_state("FAILED")
+                msg = f"链上交易确认失败:{hash}"
+                logging.error(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                self._set_state("FAILED")
 
         except Exception as e:
-            logging.error(f"查询链上确认状态时发生错误:{e}")
+            msg = f"查询链上确认状态时发生错误:{e}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
             self._set_state("FAILED")
 
     def _execute_sell_on_exchange(self):
         """
         在中心化交易所卖出现货
         """
-        logging.info("执行:中心化交易所卖出现货...")
+        msg = "执行:中心化交易所卖出现货..."
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
             pseudo_amount_to_sell = self.arbitrage_details["chain_buy_amount"]
             # pseudo_amount_to_sell = decimal.Decimal('200000')
@@ -225,16 +258,20 @@ class ArbitrageProcess:
             logging.info(order_params)
             exchange_sell_order = mexc.trade.post_order(order_params)
             if 'orderId' not in exchange_sell_order:
-                logging.error(f"交易所现货卖出下单失败:{exchange_sell_order}")
+                msg = f"交易所现货卖出下单失败:{exchange_sell_order}"
+                logging.error(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
                 self._set_state("FAILED")
 
                 return
 
             exchange_sell_order_id = exchange_sell_order['orderId']
             
-            logging.info(f"交易所现货卖出订单已发送, 订单ID: {exchange_sell_order_id}")
+            msg = f"交易所现货卖出订单已发送, 订单ID: {exchange_sell_order_id}"
+            logging.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
             self.arbitrage_details["exchange_sell_order_id"] = exchange_sell_order_id
-            
             self._set_state(self.STATE_WAITING_SELL_CONFIRM)
         except Exception as e:
             logging.error(f"交易所现货卖出下单失败:{e}")
@@ -245,7 +282,9 @@ class ArbitrageProcess:
         等待交易所现货卖出订单确认(完全成交)
         """
         exchange_sell_order_id = self.arbitrage_details["exchange_sell_order_id"]
-        logging.info(f"等待交易所现货卖出订单确认:{exchange_sell_order_id}")
+        msg = f"等待交易所现货卖出订单确认:{exchange_sell_order_id}"
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
             # 查询交易所订单状态
             waiting_times = 30
@@ -257,8 +296,9 @@ class ArbitrageProcess:
                 order = mexc.trade.get_order(params)
 
                 if order['status'] == "FILLED":
-                    logging.info(order)
-                    logging.info("交易所现货卖出订单已完全成交。")
+                    msg = f"交易所现货卖出订单已完全成交。{order}"
+                    logging.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
                     self._set_state(self.STATE_TRANSFERRING_TO_CHAIN)
                     return
@@ -270,14 +310,19 @@ class ArbitrageProcess:
                 waiting_times = waiting_times - 1
 
         except Exception as e:
-            logging.error(f"查询交易所现货卖出订单状态时发生错误:{e}")
+            msg = f"查询交易所现货卖出订单状态时发生错误:{e}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
             self._set_state("FAILED")
 
     def _execute_transfer_to_chain(self):
         """
         将交易后获得的计价资产(例如USDT)转账回链上
         """
-        logging.info("执行:交易所计价资产转账回链上...")
+        msg = "执行:交易所计价资产转账回链上..."
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+
         try:
             balances = mexc.trade.get_account_info()['balances']
             for balance in balances:
@@ -295,13 +340,19 @@ class ArbitrageProcess:
                         logging.info(withdraw_rst)
 
                     exchange_withdrawal_id = withdraw_rst["id"]
-                    logging.info(f"交易所提现已发送, 提现ID: {exchange_withdrawal_id}")
+
+                    msg = f"交易所提现已发送, 提现ID: {exchange_withdrawal_id}"
+                    logging.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
                     self.arbitrage_details["exchange_withdrawl_id"] = withdraw_rst["id"]
-                    
                     self._set_state(self.STATE_WAITING_WITHDRAWAL_CONFIRM)
 
         except Exception as e:
-            logging.error(f"转账回链上失败:{e}")
+            msg = f"转账回链上失败:{e}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
             self._set_state("FAILED")
 
     def _wait_withdrawal_confirm(self):
@@ -309,7 +360,10 @@ class ArbitrageProcess:
         等待交易所提现到链上确认
         """
         exchange_withdrawl_id = self.arbitrage_details['exchange_withdrawl_id']
-        logging.info(f"等待交易所提现确认:{exchange_withdrawl_id}")
+
+        msg = f"等待交易所提现确认:{exchange_withdrawl_id}"
+        logging.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
         try:
             is_arrived = False
 
@@ -320,7 +374,10 @@ class ArbitrageProcess:
                 withdraw_list = mexc.wallet.get_withdraw_list()
 
                 if not isinstance(withdraw_list, list):
-                    logging.error(f"查询交易所提现状态时发生错误:{e}")
+                    msg = f"查询交易所提现状态时发生错误:{withdraw_list}"
+                    logging.error(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
                     self._set_state("FAILED")
                     return
 
@@ -334,7 +391,9 @@ class ArbitrageProcess:
                         is_arrived = True
 
                 if is_arrived:
-                    logging.info("提现请求已上链")
+                    msg = f"提现请求已上链: {last_deposit_state}"
+                    logging.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
                     self._set_state(self.STATE_WAITING_TRANSFER_ARRIVE)
                     return
@@ -342,17 +401,26 @@ class ArbitrageProcess:
                 time.sleep(1)
                 waiting_times = waiting_times - 1
 
-            logging.error(f"等待提现到账超时(超过20分钟): {last_deposit_state}")
+            msg = f"等待提现到账超时(超过20分钟): {last_deposit_state}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
             self._set_state("FAILED")
         except Exception as e:
-            logging.error(f"查询交易所提现状态时发生错误:{e}")
+            msg = f"查询交易所提现状态时发生错误:{e}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            
             self._set_state("FAILED")
 
     def _wait_transfer_arrive(self):
         """
         等待资产在交易所内到账
         """
-        logging.info(f"等待资产在交易所到账...")
+        msg = f"等待资产在交易所到账..."
+        logging.error(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+
         try:
             is_arrived = False
 
@@ -371,7 +439,9 @@ class ArbitrageProcess:
                         is_arrived = True
 
                 if is_arrived:
-                    logging.info("资产已在交易所到账。")
+                    msg = "资产已在交易所到账。{last_deposit_state}"
+                    logging.error(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
 
                     self._set_state(self.STATE_COMPLETED)
                     return
@@ -379,10 +449,16 @@ class ArbitrageProcess:
                 time.sleep(1)
                 waiting_times = waiting_times - 1
 
-            logging.error(f"等待充值到账超时(超过20分钟): {last_deposit_state}")
+            msg = f"等待充值到账超时(超过20分钟): {last_deposit_state}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
             self._set_state("FAILED")
         except Exception as e:
-            logging.error(f"查询交易所到账状态时发生错误:{e}")
+            msg = f"查询交易所到账状态时发生错误:{e}"
+            logging.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
             self._set_state("FAILED")
 
 # 伪代码示例:如何使用这个类
@@ -420,11 +496,14 @@ if __name__ == "__main__":
     pprint.pprint(tx)
 
     # 套利流程执行
+    process_item = {
+        "stateFlow": [], # 状态流转记录
+    }
     arbitrage_system = ArbitrageProcess(tx, 2, 1.2, 
                                         FROM_TOKEN, TO_TOKEN, 
                                         FROM_TOKEN_AMOUNT_HUMAM, 
                                         USER_EXCHANGE_WALLET, USER_WALLET,
-                                        SYMBOL)
+                                        SYMBOL, process_item)
 
     # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
     arbitrage_system._set_state(arbitrage_system.STATE_BUYING_ON_CHAIN)
@@ -434,7 +513,8 @@ if __name__ == "__main__":
         arbitrage_system.run_arbitrage_step()
         time.sleep(1) # 暂停一段时间执行实际操作延迟
 
+    logging.info(process_item)
     if arbitrage_system.current_state == "COMPLETED":
-        print("套利流程执行成功!")
+        logging.info("套利流程执行成功!")
     else:
-        print("套利流程执行失败!")
+        logging.info("套利流程执行失败!")

+ 6 - 59
arbitrage_system.py

@@ -3,6 +3,8 @@ import threading
 import uuid # 用于生成唯一的流程ID
 import datetime
 import logging
+import arbitrage_process
+
 # 配置日志
 log = logging.getLogger('werkzeug')
 log.setLevel(logging.ERROR)
@@ -48,32 +50,6 @@ except Exception as e:
 # --- Flask 应用 ---
 app = Flask(__name__)
 
-def get_formatted_timestamp():
-    """
-    获取指定格式的时间戳: YYYY-MM-DD HH:MM:SS,ms
-    例如: 2025-05-16 14:44:09,324
-    """
-    now = datetime.datetime.now()
-    # 格式化日期和时间部分
-    timestamp_str = now.strftime("%Y-%m-%d %H:%M:%S")
-    # 获取毫秒部分,并格式化为3位数字
-    milliseconds = now.microsecond // 1000
-    milliseconds_str = f"{milliseconds:03d}"
-    # 组合最终格式
-    return f"{timestamp_str},{milliseconds_str}"
-
-def add_state_flow_entry(process_item, state_name, msg, status_val="pending"):
-    """辅助函数,用于向 stateFlow 列表添加条目。"""
-    entry = {
-        "stateName": state_name, # 状态名称
-        "timestamp": get_formatted_timestamp(), # 时间戳
-        "msg": msg, # 消息
-        "status": status_val # 状态值: "pending", "success", "fail", "skipped"
-    }
-    process_item["stateFlow"].append(entry)
-    process_item["currentState"] = state_name # 更新整体状态
-    logging.info(f"[流程 {process_item.get('id', 'N/A')}][{state_name}]: {msg} (状态: {status_val})")
-
 def arbitrage_process_flow(process_item):
     """
     在单独线程中执行的实际套利逻辑。
@@ -82,36 +58,8 @@ def arbitrage_process_flow(process_item):
     process_id = process_item['id']
     symbol = process_item['symbol']
     onchain_tx_details = process_item['tx'] # 预期包含 'rawTransaction' (原始交易)
-
-    try:
-        pass
-    except Exception as e:
-        logging.info(f"流程 {process_id} ({symbol}) 的套利过程中出错: {e}")
-        # stateFlow 中的最后一个状态应反映错误点
-        process_item['finalStatus'] = "FAILED" # 最终状态:失败
-        # 如果尚未由特定步骤的失败设置
-        if process_item['stateFlow'][-1]['status'] != "fail":
-             add_state_flow_entry(process_item, "PROCESS_FAILED", f"整体流程失败: {e}", "fail")
-
-    finally:
-        # 将条目从 processing_list 移动到 history_process_list
-        with list_lock:
-            # 通过 ID 查找并移除,以确保在列表顺序更改时(尽管仅追加使其不太可能)的健壮性
-            item_to_move = None
-            for i, item in enumerate(processing_list):
-                if item['id'] == process_id:
-                    item_to_move = processing_list.pop(i)
-                    break
-            if item_to_move:
-                history_process_list.append(item_to_move)
-                logging.info(f"流程 {process_id} 已移至历史记录。")
-            else:
-                logging.info(f"警告: 流程 {process_id} 未在 processing_list 中找到,无法移至历史记录。")
-
-        # 更新此交易对的最后处理区块信息 (可选, 用于防止立即重新套利)
-        # current_block = web3.eth.get_block('latest')['number']
-        # last_process_info[symbol] = {"block": current_block, "timestamp": time.time()}
-        # logging.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
+    
+    
 
 @app.route('/submit_process', methods=['POST'])
 def handle_submit_process():
@@ -145,7 +93,7 @@ def handle_submit_process():
         process_id = str(uuid.uuid4()) # 生成唯一流程ID
         process_item = {
             "id": process_id,
-            "creationTime": get_formatted_timestamp(), # 创建时间
+            "creationTime": arbitrage_process.get_formatted_timestamp(), # 创建时间
             "tx": data['tx'], # 交易详情,应包含 rawTransaction
             "profit": str(profit), # 利润 (字符串存储)
             "profitLimit": str(profit_limit), # 利润阈值 (字符串存储)
@@ -156,10 +104,9 @@ def handle_submit_process():
             "toToken": data['toToken'], # 目标代币
             "stateFlow": [], # 状态流转记录
             "currentState": "PENDING_START", # 当前状态
-            "finalStatus": "PROCESSING" # 最终状态 (PROCESSING / SUCCESS / FAILED)
         }
         # 初始状态更新
-        add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。利润 {profit} >= 利润阈值 {profit_limit}。开始套利。", "success")
+        arbitrage_process.add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。利润 {profit} >= 利润阈值 {profit_limit}。开始套利。", "success")
 
         with list_lock:
             processing_list.append(process_item)