|
|
@@ -1,7 +1,10 @@
|
|
|
import decimal
|
|
|
import threading
|
|
|
import uuid # 用于生成唯一的流程ID
|
|
|
-from datetime import datetime, timezone
|
|
|
+import datetime
|
|
|
+import logging
|
|
|
+# 配置日志
|
|
|
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
from flask import Flask, request, jsonify
|
|
|
from web3_py_client import EthClient # 你特定的客户端
|
|
|
@@ -28,36 +31,46 @@ global_nonce_USER_WALLET = 0 # 从 USER_WALLET 发送交易的全局 Nonce
|
|
|
nonce_lock_USER_WALLET = threading.Lock() # USER_WALLET Nonce 的锁
|
|
|
|
|
|
try:
|
|
|
- if web3.provider: # 检查 web3 是否已某种程度初始化
|
|
|
+ if web3.w3.provider: # 检查 web3 是否已某种程度初始化
|
|
|
# 这个全局 nonce 应该小心初始化。
|
|
|
# 如果 price_checker 发送交易,它应该管理 USER_WALLET 的 tx 的 nonce。
|
|
|
# 这个服务器的 global_nonce 是针对它自己可能创建的 tx。
|
|
|
# 暂时假设传入的 TX 具有其 nonce 或 price_checker 处理了它。
|
|
|
- global_nonce_USER_WALLET = web3.eth.get_transaction_count(USER_WALLET, 'latest')
|
|
|
- print(f"如果服务器要创建交易,{USER_WALLET} 的初始 nonce 将在此处获取。")
|
|
|
+ global_nonce_USER_WALLET = web3.w3.eth.get_transaction_count(USER_WALLET, 'latest')
|
|
|
+ logging.info(f"如果服务器要创建交易,{USER_WALLET} 的初始 nonce 将在此处获取。")
|
|
|
else:
|
|
|
- print("Web3 提供者未连接, USER_WALLET 的全局 nonce 未初始化。")
|
|
|
+ logging.info("Web3 提供者未连接, USER_WALLET 的全局 nonce 未初始化。")
|
|
|
except Exception as e:
|
|
|
- print(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
|
|
|
+ logging.info(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
|
|
|
|
|
|
# --- Flask 应用 ---
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
-def get_current_timestamp_iso():
|
|
|
- """获取当前 UTC 时间的 ISO 格式字符串"""
|
|
|
- return datetime.now(timezone.utc).isoformat()
|
|
|
+def get_formatted_timestamp():
|
|
|
+ """
|
|
|
+ 获取指定格式的时间戳: YYYY-MM-DD HH:MM:SS,ms
|
|
|
+ 例如: 2025-05-16 14:44:09,324
|
|
|
+ """
|
|
|
+ now = datetime.datetime.now()
|
|
|
+ # 格式化日期和时间部分
|
|
|
+ timestamp_str = now.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ # 获取毫秒部分,并格式化为3位数字
|
|
|
+ milliseconds = now.microsecond // 1000
|
|
|
+ milliseconds_str = f"{milliseconds:03d}"
|
|
|
+ # 组合最终格式
|
|
|
+ return f"{timestamp_str},{milliseconds_str}"
|
|
|
|
|
|
def add_state_flow_entry(process_item, state_name, msg, status_val="pending"):
|
|
|
"""辅助函数,用于向 stateFlow 列表添加条目。"""
|
|
|
entry = {
|
|
|
"stateName": state_name, # 状态名称
|
|
|
- "timestamp": get_current_timestamp_iso(), # 时间戳
|
|
|
+ "timestamp": get_formatted_timestamp(), # 时间戳
|
|
|
"msg": msg, # 消息
|
|
|
"status": status_val # 状态值: "pending", "success", "fail", "skipped"
|
|
|
}
|
|
|
process_item["stateFlow"].append(entry)
|
|
|
process_item["currentState"] = state_name # 更新整体状态
|
|
|
- print(f"[流程 {process_item.get('id', 'N/A')}][{state_name}]: {msg} (状态: {status_val})")
|
|
|
+ logging.info(f"[流程 {process_item.get('id', 'N/A')}][{state_name}]: {msg} (状态: {status_val})")
|
|
|
|
|
|
def arbitrage_process_flow(process_item):
|
|
|
"""
|
|
|
@@ -71,7 +84,7 @@ def arbitrage_process_flow(process_item):
|
|
|
try:
|
|
|
pass
|
|
|
except Exception as e:
|
|
|
- print(f"流程 {process_id} ({symbol}) 的套利过程中出错: {e}")
|
|
|
+ logging.info(f"流程 {process_id} ({symbol}) 的套利过程中出错: {e}")
|
|
|
# stateFlow 中的最后一个状态应反映错误点
|
|
|
process_item['finalStatus'] = "FAILED" # 最终状态:失败
|
|
|
# 如果尚未由特定步骤的失败设置
|
|
|
@@ -89,14 +102,14 @@ def arbitrage_process_flow(process_item):
|
|
|
break
|
|
|
if item_to_move:
|
|
|
history_process_list.append(item_to_move)
|
|
|
- print(f"流程 {process_id} 已移至历史记录。")
|
|
|
+ logging.info(f"流程 {process_id} 已移至历史记录。")
|
|
|
else:
|
|
|
- print(f"警告: 流程 {process_id} 未在 processing_list 中找到,无法移至历史记录。")
|
|
|
+ logging.info(f"警告: 流程 {process_id} 未在 processing_list 中找到,无法移至历史记录。")
|
|
|
|
|
|
# 更新此交易对的最后处理区块信息 (可选, 用于防止立即重新套利)
|
|
|
# current_block = web3.eth.get_block('latest')['number']
|
|
|
# last_process_info[symbol] = {"block": current_block, "timestamp": time.time()}
|
|
|
- # print(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
|
|
|
+ # logging.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
|
|
|
|
|
|
@app.route('/submit_process', methods=['POST'])
|
|
|
def handle_submit_process():
|
|
|
@@ -130,7 +143,7 @@ def handle_submit_process():
|
|
|
process_id = str(uuid.uuid4()) # 生成唯一流程ID
|
|
|
process_item = {
|
|
|
"id": process_id,
|
|
|
- "creationTime": get_current_timestamp_iso(), # 创建时间
|
|
|
+ "creationTime": get_formatted_timestamp(), # 创建时间
|
|
|
"tx": data['tx'], # 交易详情,应包含 rawTransaction
|
|
|
"profit": str(profit), # 利润 (字符串存储)
|
|
|
"profitLimit": str(profit_limit), # 利润阈值 (字符串存储)
|
|
|
@@ -150,7 +163,7 @@ def handle_submit_process():
|
|
|
processing_list.append(process_item)
|
|
|
|
|
|
last_process_info[symbol] = current_block
|
|
|
- print(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
|
|
|
+ logging.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
|
|
|
|
|
|
# 在新线程中开始套利过程
|
|
|
arb_thread = threading.Thread(target=arbitrage_process_flow, args=(process_item,), daemon=True)
|
|
|
@@ -186,5 +199,5 @@ def get_status():
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
# 如果此服务器为其自身的交易管理 global_nonce_USER_WALLET,则在此处初始化
|
|
|
- print("启动 Flask 套利执行服务器...")
|
|
|
+ logging.info("启动 Flask 套利执行服务器...")
|
|
|
app.run(host='0.0.0.0', port=5002, debug=False) # 使用与 price_checker 不同的端口
|