Эх сурвалжийг харах

套利系统的初步架构

skyfffire 5 сар өмнө
parent
commit
976db4f44b
1 өөрчлөгдсөн 188 нэмэгдсэн , 36 устгасан
  1. 188 36
      arbitrage_system.py

+ 188 - 36
arbitrage_system.py

@@ -1,38 +1,190 @@
-from decimal import Decimal
-from web3_py_client import EthClient
-web3 = EthClient()
-
-# 其它的配置应该写入到price_checker.py里面去
-USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4'
-USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
-
-# 该代币最后一次执行套利的区块
-last_process_info = {
-    "RATO_USDT": 0
-}
-
-nonce = web3.eth.get_transaction_count(USER_WALLET, 'latest')
-
-# 从price_checker打一个单子到这个系统里,添加到processing_list里面给人类查看
-# 正在处理列表 demo
-processing_list = {
-    {
-        "tx": {},
-        "spread": 0.01,                         # 价差,负数则代表链上价格落后于交易所价格
-        "symbol": "RATO_USDT",                  # 交易币对、从price_checker来的
-        "fromToken": "xxx",
-        "fromTokenAmountHuman": Decimal('10'),
-        "fromTokenDecimal": 6,
-        "toToken": "xxx",
-        "stateFlow": [                          # 状态、时间、msg、state
-            { "state": "BUYING_ON_CHAIN", "timestamp": "时间戳或者时间字符串都行", "msg": "正在链上买入RATO", "state": "success/fail" },
-            { "state": "WAITING_CHAIN_CONFIRM", "timestamp": "时间戳或者时间字符串都行", "msg": "成功花费x USDT买入xRATO/失败买入", "state": "success/fail" },
-            { "state": "TRANSFERRING_TO_EXCHANGE", "timestamp": "时间戳或者时间字符串都行", "msg": "向交易所转账x RATO发送成功", "state": "success/fail" },
-            # ...
-        ],
-        "state": "IDLE"
+import decimal
+import threading
+import uuid # 用于生成唯一的流程ID
+from datetime import datetime, timezone
+
+from flask import Flask, request, jsonify
+from web3_py_client import EthClient # 你特定的客户端
+
+
+web3 = EthClient() 
+
+USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4' # 用户钱包地址
+USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA' # 用户在交易所的钱包地址 (用于充值)
+
+# 该代币最后一次执行套利的区块信息 (如果需要防止过于频繁的同类套利,不然变成砸盘、拉盘的了)
+last_process_info = {} # 示例: {"RATO_USDT": 0}
+MIN_BLOCKS_BETWEEN_ARB = 1 # 在重试相同交易对之前等待1个区块
+
+# --- 全局状态和锁 ---
+processing_list = [] # 正在处理的任务列表
+history_process_list = [] # 已完成的任务历史列表
+list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_list 结构的锁
+
+# USER_WALLET 的 Nonce 管理,前提是此服务器为该钱包发起交易。
+# 如果传入的 'tx' 已预签名并包含 nonce,则此全局 nonce 对该特定 tx 不太重要。
+# 但如果此服务器要从 USER_WALLET 发起*其他*交易(例如代币批准),则此 nonce 很重要。
+global_nonce_USER_WALLET = 0 # 从 USER_WALLET 发送交易的全局 Nonce
+nonce_lock_USER_WALLET = threading.Lock() # USER_WALLET Nonce 的锁
+
+try:
+    if web3.provider: # 检查 web3 是否已某种程度初始化
+        # 这个全局 nonce 应该小心初始化。
+        # 如果 price_checker 发送交易,它应该管理 USER_WALLET 的 tx 的 nonce。
+        # 这个服务器的 global_nonce 是针对它自己可能创建的 tx。
+        # 暂时假设传入的 TX 具有其 nonce 或 price_checker 处理了它。
+        global_nonce_USER_WALLET = web3.eth.get_transaction_count(USER_WALLET, 'latest')
+        print(f"如果服务器要创建交易,{USER_WALLET} 的初始 nonce 将在此处获取。")
+    else:
+        print("Web3 提供者未连接, USER_WALLET 的全局 nonce 未初始化。")
+except Exception as e:
+    print(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
+
+# --- Flask 应用 ---
+app = Flask(__name__)
+
+def get_current_timestamp_iso():
+    """获取当前 UTC 时间的 ISO 格式字符串"""
+    return datetime.now(timezone.utc).isoformat()
+
+def add_state_flow_entry(process_item, state_name, msg, status_val="pending"):
+    """辅助函数,用于向 stateFlow 列表添加条目。"""
+    entry = {
+        "stateName": state_name, # 状态名称
+        "timestamp": get_current_timestamp_iso(), # 时间戳
+        "msg": msg, # 消息
+        "status": status_val # 状态值: "pending", "success", "fail", "skipped"
     }
-}
+    process_item["stateFlow"].append(entry)
+    process_item["currentState"] = state_name # 更新整体状态
+    print(f"[流程 {process_item.get('id', 'N/A')}][{state_name}]: {msg} (状态: {status_val})")
+
+def arbitrage_process_flow(process_item):
+    """
+    在单独线程中执行的实际套利逻辑。
+    会直接修改 'process_item' 字典。
+    """
+    process_id = process_item['id']
+    symbol = process_item['symbol']
+    onchain_tx_details = process_item['tx'] # 预期包含 'rawTransaction' (原始交易)
+
+    try:
+        pass
+    except Exception as e:
+        print(f"流程 {process_id} ({symbol}) 的套利过程中出错: {e}")
+        # stateFlow 中的最后一个状态应反映错误点
+        process_item['finalStatus'] = "FAILED" # 最终状态:失败
+        # 如果尚未由特定步骤的失败设置
+        if process_item['stateFlow'][-1]['status'] != "fail":
+             add_state_flow_entry(process_item, "PROCESS_FAILED", f"整体流程失败: {e}", "fail")
+
+    finally:
+        # 将条目从 processing_list 移动到 history_process_list
+        with list_lock:
+            # 通过 ID 查找并移除,以确保在列表顺序更改时(尽管仅追加使其不太可能)的健壮性
+            item_to_move = None
+            for i, item in enumerate(processing_list):
+                if item['id'] == process_id:
+                    item_to_move = processing_list.pop(i)
+                    break
+            if item_to_move:
+                history_process_list.append(item_to_move)
+                print(f"流程 {process_id} 已移至历史记录。")
+            else:
+                print(f"警告: 流程 {process_id} 未在 processing_list 中找到,无法移至历史记录。")
+
+        # 更新此交易对的最后处理区块信息 (可选, 用于防止立即重新套利)
+        # current_block = web3.eth.get_block('latest')['number']
+        # last_process_info[symbol] = {"block": current_block, "timestamp": time.time()}
+        # print(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
+
+@app.route('/submit_process', methods=['POST'])
+def handle_submit_process():
+    data = request.get_json()
+    if not data:
+        return jsonify({"error": "无效的 JSON 请求体"}), 400
+
+    required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'fromToken', 'fromTokenAmountHuman', 'fromTokenDecimal', 'toToken']
+    for field in required_fields:
+        if field not in data:
+            return jsonify({"error": f"缺少字段: {field}"}), 400
+
+    try:
+        profit = decimal.Decimal(str(data['profit']))                                   # 利润
+        profit_limit = decimal.Decimal(str(data['profitLimit']))                        # 利润阈值
+        from_token_amount_human = decimal.Decimal(str(data['fromTokenAmountHuman']))    # fromToken 的人类可读数量
+        from_token_decimal = decimal.Decimal(str(data['fromTokenDecimal']))             # fromToken 的小数位数
+    except (decimal.InvalidOperation, ValueError) as e:
+        return jsonify({"error": f"请求体中包含无效的小数/整数值: {e}"}), 400
+
+    symbol = data['symbol'] # 交易对符号
+
+    # 检查此交易对此区块是否处理过
+    last_info = last_process_info.get(symbol)
+    current_block = web3.eth.block_number
+    if last_info:
+        if current_block - last_info['block'] < MIN_BLOCKS_BETWEEN_ARB:
+            return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_info['block']}). 当前区块 {current_block}."}), 200
+
+    if profit >= profit_limit:
+        process_id = str(uuid.uuid4()) # 生成唯一流程ID
+        process_item = {
+            "id": process_id,
+            "creationTime": get_current_timestamp_iso(), # 创建时间
+            "tx": data['tx'], # 交易详情,应包含 rawTransaction
+            "profit": str(profit), # 利润 (字符串存储)
+            "profitLimit": str(profit_limit), # 利润阈值 (字符串存储)
+            "symbol": symbol, # 交易对
+            "fromToken": data['fromToken'], # 起始代币
+            "fromTokenAmountHuman": str(from_token_amount_human), # 起始代币数量 (人类可读, 字符串存储)
+            "fromTokenDecimal": from_token_decimal, # 起始代币小数位数
+            "toToken": data['toToken'], # 目标代币
+            "stateFlow": [], # 状态流转记录
+            "currentState": "PENDING_START", # 当前状态
+            "finalStatus": "PROCESSING" # 最终状态 (PROCESSING / SUCCESS / FAILED)
+        }
+        # 初始状态更新
+        add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。利润 {profit} >= 利润阈值 {profit_limit}。开始套利。", "success")
+
+        with list_lock:
+            processing_list.append(process_item)
+
+        last_process_info[symbol] = current_block
+        print(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
+
+        # 在新线程中开始套利过程
+        arb_thread = threading.Thread(target=arbitrage_process_flow, args=(process_item,), daemon=True)
+        arb_thread.start()
+
+        return jsonify({"message": "套利过程已启动", "process_id": process_id}), 201
+    else:
+        return jsonify({"message": f"利润 {profit} 小于利润阈值 {profit_limit}。不处理。"}), 200
+
+@app.route('/processing', methods=['GET'])
+def get_processing_list():
+    """获取正在处理的任务列表"""
+    with list_lock:
+        # 返回一个副本,以避免在迭代生成 JSON 响应时列表被修改的问题
+        return jsonify(list(processing_list))
+
+@app.route('/history', methods=['GET'])
+def get_history_list():
+    """获取已完成的任务历史列表"""
+    with list_lock:
+        return jsonify(list(history_process_list))
+
+@app.route('/status', methods=['GET'])
+def get_status():
+    """获取系统状态概览"""
+    with list_lock:
+        return jsonify({
+            "processing_count": len(processing_list), # 正在处理的任务数量
+            "history_count": len(history_process_list), # 历史任务数量
+            # "current_nonce_USER_WALLET_if_managed_here": global_nonce_USER_WALLET, # 示例:如果服务器管理此nonce
+            "last_process_info": last_process_info # 最后处理信息 (如果使用)
+        })
 
-# 已处理列表
-history_process_list = []
+if __name__ == "__main__":
+    # 如果此服务器为其自身的交易管理 global_nonce_USER_WALLET,则在此处初始化
+    print("启动 Flask 套利执行服务器...")
+    app.run(host='0.0.0.0', port=5002, debug=False) # 使用与 price_checker 不同的端口