# -*- coding: utf-8 -*- from checker.logger_config import get_logger logger = get_logger('as') logger.info('\n\n----------------------------------------------as啓動--------------------------------------------') import threading import uuid # 用于生成唯一的流程ID import time import logging import s_erc20_to_mexc import s_mexc_to_erc20 import web3_py_client import traceback import copy import sys import pandas as pd import io from decimal import Decimal, ROUND_DOWN from flask import Flask, request, jsonify, send_file from flask_cors import CORS # 导入 from as_utils import get_formatted_timestamp from as_utils import add_state_flow_entry from binance.client import Client # 用于获取ETH价格 from mexc_client import MexcClient from pprint import pprint from pprint import pformat from config import rpc_url # 配置日志 log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) web3 = web3_py_client.EthClient(rpc_url) w3 = web3.w3 mexc = MexcClient() # 该代币最后一次执行套利的区块信息 (如果需要防止过于频繁的同类套利,不然变成砸盘、拉盘的了) last_process_info = {} # 示例: {"RATO_USDT": 0} MIN_BLOCKS_BETWEEN_ARB = Decimal(1) # 在重试相同交易对之前等待几个区块 # --- 全局状态和锁 --- processing_list = [] # 正在处理的任务列表 history_process_list = [] # 已完成的任务历史列表 list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_list 结构的锁 # --- 一些核心數據和鎖 --- core_data = { "eth_price": 0, # 全局 eth價格 "block_number": 0, # 全局 區塊號 "block": None, # 全局 最后一個區塊的信息 } core_lock = threading.Lock() # 核心數據的锁 # --- mexc相關數據和鎖 --- mexc_data = { "account_info": {}, "coin_info_map": {}, # 處理過的幣種信息,coin_info_map[coin][network] } mexc_lock = threading.Lock() CHAIN_ID = -1 try: if w3.provider: CHAIN_ID = w3.eth.chain_id logger.info(f"Web3 已连接。chain_id={CHAIN_ID}") else: logger.info("Web3 未连接。") except Exception as e: logger.info(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}") # Binance 客户端 (无需API Key/Secret即可获取公开行情数据) try: binance_client = Client() # 测试连接 (可选,但建议) binance_client.ping() logger.info("成功连接到 Binance API。") except Exception as e: logger.error(f"初始化 Binance Client 时发生错误: {e}") binance_client = None # --- Flask 应用 --- app = Flask(__name__) CORS(app) # 在创建 app 实例后启用 CORS def move_completed_process_to_history(process_id_to_move: str) -> bool: """ 将一个完成的 process_item 从 processing_list 移动到 history_process_list。 此操作是线程安全的。 Args: process_id_to_move (str): 要移动的 process_item 的 ID。 Returns: bool: 如果成功找到并移动了 item,则返回 True,否则返回 False。 """ global processing_list, history_process_list # 因为我们要修改这两个列表 item_to_move = None moved_successfully = False with list_lock: # 查找并从 processing_list 中移除 found_index = -1 for i, item in enumerate(processing_list): if item.get('id') == process_id_to_move: found_index = i break if found_index != -1: item_to_move = processing_list.pop(found_index) # 从 processing_list 中移除并获取它 # 假设在 item_to_move 中,其 currentState 已经被 arbitrage_process_flow 更新为 COMPLETED 或 FAILED # arbitrage_process.add_state_flow_entry(item_to_move, "MOVED_TO_HISTORY", f"流程处理完毕,移至历史记录。最终状态: {item_to_move.get('currentState', 'N/A')}", "info") if item_to_move['currentState'] in ['COMPLETED', 'FAILED']: history_process_list.append(item_to_move) # 添加到 history_process_list logger.info(f"已将 process_id: {process_id_to_move} 从 processing_list 移动到 history_process_list。") moved_successfully = True else: logger.warning(f"尝试移动到 history_list 时,在 processing_list 中未找到 process_id: {process_id_to_move}") return moved_successfully # 策略構建器 def strategy_builder(process_item): strategy = process_item['strategy'] global core_data global core_lock global mexc_data global mexc_lock process_item_formated = pformat(process_item, indent=2) logger.info(f'策略原始参数:\n{process_item_formated}') if strategy == 'erc20_to_mexc': return s_erc20_to_mexc.ArbitrageProcess(process_item, core_data, core_lock, mexc_data, mexc_lock ) # elif strategy == 'mexc_to_erc20': # return s_mexc_to_erc20.ArbitrageProcess(process_item, # core_data, core_lock, # mexc_data, mexc_lock # ) else: logger.error(f'不存在的策略:{strategy}') # 實際套利邏輯 def arbitrage_process_flow(process_item): """ 在单独线程中执行的实际套利逻辑。 会直接修改 'process_item' 字典。 """ process_id = process_item['id'] ap = strategy_builder(process_item) # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程 ap._set_state(ap.STATE_CHECK) # 在主循环中周期性调用 run_arbitrage_step while ap.current_state != ap.STATE_COMPLETED and ap.current_state != ap.STATE_FAILED and ap.current_state != ap.STATE_REJECT: ap.run_arbitrage_step() ap.run_arbitrage_step() move_completed_process_to_history(process_id) # --- 核心數據更新綫程函數 --- def update_core_data_periodically(): """ 周期性更新 nonce 和 ETH 价格的线程函数。 """ global core_data # 明确表示我们要修改全局的 core_data while True: try: new_eth_price = None new_eth_balance = None new_nonce = None new_block_number = None new_block = None # 1. 从 Binance 获取 ETH 价格 if binance_client: try: ticker = binance_client.get_symbol_ticker(symbol="ETHUSDT") new_eth_price = float(ticker['price']) except Exception as e: logger.error(f"从 Binance 获取 ETH 价格失败: {e}") else: logger.warning("Binance client 未初始化, 无法获取 ETH 价格。") # 2. 获取最新的block_number # 确保 w3 已初始化且 USER_WALLET 已配置 if w3 and w3.is_connected(): try: new_block = w3.eth.get_block('latest') new_block_number = new_block['number'] except Exception as e: logger.error(f"获取 BlockNumber 失败: {e}") elif not (w3 and w3.is_connected()): logger.warning("Web3 未连接, 无法获取 BlockNumber。") # 3. 更新共享数据 core_data (使用锁) # 只有当获取到新数据时才更新,避免不必要的写操作和日志 with core_lock: if new_eth_price is not None and core_data["eth_price"] != new_eth_price: eth_price = Decimal(new_eth_price) eth_price = eth_price.quantize(Decimal('1e-2'), rounding=ROUND_DOWN) core_data["eth_price"] = eth_price # 判斷block_number是否發生變化(升高) if new_block_number is not None and new_block_number > core_data["block_number"]: core_data["block_number"] = new_block_number core_data["block"] = new_block # 區塊變了才刷新nonce,否則還是要靠本地的緩存維護 if new_nonce is not None and core_data["nonce"] != new_nonce: core_data["nonce"] = new_nonce # 餘額也同理 if new_eth_balance is not None and core_data["eth_balance"] != new_eth_balance: core_data["eth_balance"] = new_eth_balance # logger.info(f"核心数据已更新: ETH Price = {core_data['eth_price']}, Nonce ({USER_WALLET}) = {core_data['nonce']}, EthBalance={core_data['eth_balance']}, BlockNumber = {core_data['block_number']}") except Exception as e: # 捕获线程循环中的其他潜在错误 exc_traceback = traceback.format_exc() logger.error(f"数据更新线程发生未知错误\n{exc_traceback}") # traceback.print_exc() # 等待 500ms time.sleep(0.5) # --- mexc數據更新綫程函數 --- def update_mexc_data_periodically(): """ 周期性更新 mexc的相關數據 的线程函数。 """ global mexc_data # 每60秒獲取一次coin_info coin_info_get_delay = 60 while True: try: new_account_info = None new_coin_info_list = None # 1. new_account_info try: new_account_info = mexc.trade.get_account_info() if 'balances' not in new_account_info: raise Exception("未找到balances") with mexc_lock: mexc_data['account_info'] = new_account_info # logger.info(f'account_info: {new_account_info['balances']}') except Exception as e: logger.error(f"从 Mexc 获取 Balance 失败: {e}, {new_account_info}") # 2. new_coin_info list try: if coin_info_get_delay >= 60: coin_info_get_delay = 0 new_coin_info_list = mexc.wallet.get_coinlist() if not isinstance(new_coin_info_list, list): raise Exception("幣種信息獲取錯誤") # 處理幣種信息 new_coin_info_map = {} for coin_info in new_coin_info_list: new_coin_info_map[coin_info['coin']] = {} for network in coin_info['networkList']: new_coin_info_map[coin_info['coin']][network['netWork']] = network with mexc_lock: mexc_data['coin_info_map'] = new_coin_info_map # logger.info(f'coin_info_map: {new_coin_info_map['USDT']}') except Exception as e: logger.error(f"从 Mexc 获取 coinlist 失败: {e}, {new_coin_info_list}") except Exception as e: # 捕获线程循环中的其他潜在错误 exc_traceback = traceback.format_exc() logger.error(f"数据更新线程发生未知错误\n{exc_traceback}") # traceback.print_exc() # 幣種信息處理的delay coin_info_get_delay = coin_info_get_delay + 1 # 等待 1s time.sleep(1) @app.route('/submit_process', methods=['POST']) def handle_submit_process(): data = request.get_json() if not data: return jsonify({"error": "无效的 JSON 请求体"}), 400 required_fields = ['symbol', 'strategy'] for field in required_fields: if field not in data: return jsonify({"error": f"缺少字段: {field}, keys: {data.keys()}"}), 400 symbol = data['symbol'] # 交易对符号 # 检查此交易对此区块是否处理过 last_trade_block = last_process_info.get(symbol) with core_lock: current_block = core_data['block_number'] if last_trade_block: if current_block - last_trade_block < MIN_BLOCKS_BETWEEN_ARB: return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_trade_block}). 当前区块 {current_block}."}), 200 process_id = str(uuid.uuid4()) # 生成唯一流程ID process_item = copy.deepcopy(data) process_item['id'] = process_id process_item['profit'] = Decimal(0) process_item['creationTime'] = get_formatted_timestamp(), # 创建时间 process_item['stateFlow'] = [] # 状态流转记录 process_item['currentState'] = "PENDING_START" # 初始状态更新 add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。开始套利。", "success") with list_lock: processing_list.append(process_item) last_process_info[symbol] = current_block logger.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}") # 在新线程中开始套利过程 arb_thread = threading.Thread(target=arbitrage_process_flow, args=(process_item,), daemon=True) arb_thread.start() return jsonify({"message": "套利过程已启动", "process_id": process_id}), 201 @app.route('/processing', methods=['GET']) def get_processing_list(): """获取正在处理的任务列表""" with list_lock: # 返回一个副本,以避免在迭代生成 JSON 响应时列表被修改的问题 return jsonify(list(processing_list)) @app.route('/history', methods=['GET']) def get_history_list(): """获取已完成的任务历史列表""" with list_lock: return jsonify(list(history_process_list)) @app.route('/status', methods=['GET']) def get_status(): """获取系统状态概览""" with list_lock: return jsonify({ "processing_count": len(processing_list), # 正在处理的任务数量 "history_count": len(history_process_list), # 历史任务数量 # "current_nonce_USER_WALLET_if_managed_here": global_nonce_USER_WALLET, # 示例:如果服务器管理此nonce "last_process_info": last_process_info # 最后处理信息 (如果使用) }) @app.route('/download_history_excel', methods=['GET']) def download_history_excel(): """下载历史记录的 Excel 文件""" with list_lock: # 为了线程安全,复制一份列表进行处理 history_data = list(history_process_list) if not history_data: # 如果没有数据,可以返回一个空文件或一个提示信息 return "没有历史记录可以下载。", 404 # 1. 使用 pandas 将列表数据转换成 DataFrame # 假设您的列表中的每个元素都是一个字典,例如: # {'交易对': 'DORKY_USDT', '利润': 1.23, '创建时间': '...'} df = pd.DataFrame(history_data) # (可选) 整理列的顺序或重命名,让 Excel 更美观 # df = df[['创建时间', '交易对', '利润', '最终状态']] # df.rename(columns={'创建时间': '交易发生时间'}, inplace=True) # 2. 在内存中创建一个 Excel 文件 output = io.BytesIO() # 使用 to_excel 方法,并用 openpyxl 引擎 # index=False 表示不把 DataFrame 的索引写入 Excel with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False, sheet_name='History') # 移动到二进制流的开头 output.seek(0) # 3. 使用 send_file 将内存中的文件作为附件发送给浏览器 return send_file( output, # attachment_filename 是浏览器下载时默认显示的文件名 download_name='history_records.xlsx', # as_attachment=True 表示作为附件下载,而不是在浏览器中打开 as_attachment=True, # mimetype 告诉浏览器这是一个 Excel 文件 mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) if __name__ == "__main__": logger.info("启动核心数据更新线程...") updater_thread = threading.Thread(target=update_core_data_periodically, daemon=True) updater_thread.start() logger.info("启动抹茶数据更新线程...") updater_thread = threading.Thread(target=update_mexc_data_periodically, daemon=True) updater_thread.start() logger.info("主线程继续执行,可以执行其他任务或保持运行以观察数据更新。") logger.info("启动 Flask 套利执行服务器...") app.run(host='0.0.0.0', port=1888, debug=False) # 使用与 price_checker 不同的端口