as.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- coding: utf-8 -*-
  2. import decimal
  3. import threading
  4. import uuid # 用于生成唯一的流程ID
  5. import time
  6. import logging
  7. import erc20_to_mexc_first_sell
  8. # 配置日志
  9. log = logging.getLogger('werkzeug')
  10. log.setLevel(logging.ERROR)
  11. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  12. from flask import Flask, request, jsonify
  13. from flask_cors import CORS # 导入
  14. from web3_py_client import EthClient # 你特定的客户端
  15. from as_utils import get_formatted_timestamp
  16. from as_utils import add_state_flow_entry
  17. web3 = EthClient()
  18. USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4' # 用户钱包地址
  19. USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA' # 用户在交易所的钱包地址 (用于充值)
  20. # 该代币最后一次执行套利的区块信息 (如果需要防止过于频繁的同类套利,不然变成砸盘、拉盘的了)
  21. last_process_info = {} # 示例: {"RATO_USDT": 0}
  22. MIN_BLOCKS_BETWEEN_ARB = decimal.Decimal(5) # 在重试相同交易对之前等待几个区块
  23. # --- 全局状态和锁 ---
  24. processing_list = [] # 正在处理的任务列表
  25. history_process_list = [] # 已完成的任务历史列表
  26. list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_list 结构的锁
  27. # USER_WALLET 的 Nonce 管理,前提是此服务器为该钱包发起交易。
  28. # 如果传入的 'tx' 已预签名并包含 nonce,则此全局 nonce 对该特定 tx 不太重要。
  29. # 但如果此服务器要从 USER_WALLET 发起*其他*交易(例如代币批准),则此 nonce 很重要。
  30. global_nonce_USER_WALLET = 0 # 从 USER_WALLET 发送交易的全局 Nonce
  31. nonce_lock_USER_WALLET = threading.Lock() # USER_WALLET Nonce 的锁
  32. try:
  33. if web3.w3.provider: # 检查 web3 是否已某种程度初始化
  34. # 这个全局 nonce 应该小心初始化。
  35. # 如果 price_checker 发送交易,它应该管理 USER_WALLET 的 tx 的 nonce。
  36. # 这个服务器的 global_nonce 是针对它自己可能创建的 tx。
  37. # 暂时假设传入的 TX 具有其 nonce 或 price_checker 处理了它。
  38. global_nonce_USER_WALLET = web3.w3.eth.get_transaction_count(USER_WALLET, 'latest')
  39. logging.info(f"如果服务器要创建交易,{USER_WALLET} 的初始 nonce 将在此处获取。")
  40. else:
  41. logging.info("Web3 提供者未连接, USER_WALLET 的全局 nonce 未初始化。")
  42. except Exception as e:
  43. logging.info(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
  44. # --- Flask 应用 ---
  45. app = Flask(__name__)
  46. CORS(app) # 在创建 app 实例后启用 CORS
  47. def move_completed_process_to_history(process_id_to_move: str) -> bool:
  48. """
  49. 将一个完成的 process_item 从 processing_list 移动到 history_process_list。
  50. 此操作是线程安全的。
  51. Args:
  52. process_id_to_move (str): 要移动的 process_item 的 ID。
  53. Returns:
  54. bool: 如果成功找到并移动了 item,则返回 True,否则返回 False。
  55. """
  56. global processing_list, history_process_list # 因为我们要修改这两个列表
  57. item_to_move = None
  58. moved_successfully = False
  59. with list_lock:
  60. # 查找并从 processing_list 中移除
  61. found_index = -1
  62. for i, item in enumerate(processing_list):
  63. if item.get('id') == process_id_to_move:
  64. found_index = i
  65. break
  66. if found_index != -1:
  67. item_to_move = processing_list.pop(found_index) # 从 processing_list 中移除并获取它
  68. # 假设在 item_to_move 中,其 currentState 已经被 arbitrage_process_flow 更新为 COMPLETED 或 FAILED
  69. # arbitrage_process.add_state_flow_entry(item_to_move, "MOVED_TO_HISTORY", f"流程处理完毕,移至历史记录。最终状态: {item_to_move.get('currentState', 'N/A')}", "info")
  70. history_process_list.append(item_to_move) # 添加到 history_process_list
  71. logging.info(f"已将 process_id: {process_id_to_move} 从 processing_list 移动到 history_process_list。")
  72. moved_successfully = True
  73. else:
  74. logging.warning(f"尝试移动到 history_list 时,在 processing_list 中未找到 process_id: {process_id_to_move}")
  75. return moved_successfully
  76. def arbitrage_process_flow(process_item):
  77. """
  78. 在单独线程中执行的实际套利逻辑。
  79. 会直接修改 'process_item' 字典。
  80. """
  81. process_id = process_item['id']
  82. SYMBOL = process_item['symbol']
  83. tx = process_item['tx'] # 预期包含 'rawTransaction' (原始交易)
  84. FROM_TOKEN = process_item['fromToken']
  85. TO_TOKEN = process_item['toToken']
  86. FROM_TOKEN_AMOUNT_HUMAM = process_item['fromTokenAmountHuman']
  87. TO_TOKEN_AMOUNT_HUMAM = process_item['toTokenAmountHuman']
  88. profit = float(process_item['profit'])
  89. USER_EXCHANGE_WALLET = process_item['userExchangeWallet']
  90. USER_WALLET = process_item['userWallet']
  91. SYMBOL = process_item['symbol']
  92. EXCHANGE_OUT_AMOUNT = process_item['exchangeOutAmount']
  93. gas_price_multiplier = 1
  94. if profit > 2:
  95. gas_price_multiplier = 1.1
  96. elif profit > 5:
  97. gas_price_multiplier = 1.5
  98. elif profit > 10:
  99. gas_price_multiplier = 2
  100. gas_limit_multiplier = 1.2
  101. ap = erc20_to_mexc_first_sell.ArbitrageProcess(tx, gas_limit_multiplier, gas_price_multiplier,
  102. FROM_TOKEN, TO_TOKEN,
  103. FROM_TOKEN_AMOUNT_HUMAM, EXCHANGE_OUT_AMOUNT,
  104. USER_EXCHANGE_WALLET, USER_WALLET,
  105. SYMBOL, process_item)
  106. # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
  107. ap._set_state(ap.STATE_CHECK)
  108. # 在主循环中周期性调用 run_arbitrage_step
  109. while ap.current_state != ap.STATE_COMPLETED and ap.current_state != ap.STATE_FAILED and ap.current_state != ap.STATE_REJECT:
  110. ap.run_arbitrage_step()
  111. if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
  112. time.sleep(10)
  113. ap.run_arbitrage_step()
  114. move_completed_process_to_history(process_id)
  115. @app.route('/submit_process', methods=['POST'])
  116. def handle_submit_process():
  117. data = request.get_json()
  118. if not data:
  119. return jsonify({"error": "无效的 JSON 请求体"}), 400
  120. required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'fromToken', 'fromTokenAmountHuman', 'fromTokenDecimal', 'toToken', 'toTokenAmountHuman', 'exchangeOutAmount']
  121. for field in required_fields:
  122. if field not in data:
  123. return jsonify({"error": f"缺少字段: {field}"}), 400
  124. try:
  125. profit = decimal.Decimal(str(data['profit'])) # 利润
  126. profit_limit = decimal.Decimal(str(data['profitLimit'])) # 利润阈值
  127. from_token_amount_human = decimal.Decimal(str(data['fromTokenAmountHuman'])) # fromToken 的人类可读数量
  128. from_token_decimal = decimal.Decimal(str(data['fromTokenDecimal'])) # fromToken 的小数位数
  129. to_token_amount_human = decimal.Decimal(str(data['toTokenAmountHuman'])) # toToken 的人类可读数量
  130. exchange_out_amount = decimal.Decimal(str(data['exchangeOutAmount'])) # 交易所需要卖出的数量
  131. except (decimal.InvalidOperation, ValueError) as e:
  132. return jsonify({"error": f"请求体中包含无效的小数/整数值: {e}"}), 400
  133. symbol = data['symbol'] # 交易对符号
  134. # 检查此交易对此区块是否处理过
  135. last_trade_block = last_process_info.get(symbol)
  136. current_block = web3.w3.eth.block_number
  137. if last_trade_block:
  138. if current_block - last_trade_block < MIN_BLOCKS_BETWEEN_ARB:
  139. return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_trade_block}). 当前区块 {current_block}."}), 200
  140. if profit >= profit_limit:
  141. process_id = str(uuid.uuid4()) # 生成唯一流程ID
  142. process_item = {
  143. "id": process_id,
  144. "creationTime": get_formatted_timestamp(), # 创建时间
  145. "tx": data['tx'], # 交易详情,应包含 rawTransaction
  146. "profit": str(profit), # 利润 (字符串存储)
  147. "profitLimit": str(profit_limit), # 利润阈值 (字符串存储)
  148. "symbol": symbol, # 交易对
  149. "userWallet": USER_WALLET,
  150. "userExchangeWallet": USER_EXCHANGE_WALLET,
  151. "fromToken": data['fromToken'], # 起始代币
  152. "fromTokenAmountHuman": str(from_token_amount_human), # 起始代币数量 (人类可读, 字符串存储)
  153. "fromTokenDecimal": from_token_decimal, # 起始代币小数位数
  154. "toTokenAmountHuman": str(to_token_amount_human),
  155. "exchangeOutAmount": str(exchange_out_amount),
  156. "toToken": data['toToken'], # 目标代币
  157. "stateFlow": [], # 状态流转记录
  158. "currentState": "PENDING_START", # 当前状态
  159. }
  160. # 初始状态更新
  161. add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。利润 {profit} >= 利润阈值 {profit_limit}。开始套利。", "success")
  162. with list_lock:
  163. processing_list.append(process_item)
  164. last_process_info[symbol] = current_block
  165. logging.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
  166. # 在新线程中开始套利过程
  167. arb_thread = threading.Thread(target=arbitrage_process_flow, args=(process_item,), daemon=True)
  168. arb_thread.start()
  169. return jsonify({"message": "套利过程已启动", "process_id": process_id}), 201
  170. else:
  171. return jsonify({"message": f"利润 {profit} 小于利润阈值 {profit_limit}。不处理。"}), 200
  172. @app.route('/processing', methods=['GET'])
  173. def get_processing_list():
  174. """获取正在处理的任务列表"""
  175. with list_lock:
  176. # 返回一个副本,以避免在迭代生成 JSON 响应时列表被修改的问题
  177. return jsonify(list(processing_list))
  178. @app.route('/history', methods=['GET'])
  179. def get_history_list():
  180. """获取已完成的任务历史列表"""
  181. with list_lock:
  182. return jsonify(list(history_process_list))
  183. @app.route('/status', methods=['GET'])
  184. def get_status():
  185. """获取系统状态概览"""
  186. with list_lock:
  187. return jsonify({
  188. "processing_count": len(processing_list), # 正在处理的任务数量
  189. "history_count": len(history_process_list), # 历史任务数量
  190. # "current_nonce_USER_WALLET_if_managed_here": global_nonce_USER_WALLET, # 示例:如果服务器管理此nonce
  191. "last_process_info": last_process_info # 最后处理信息 (如果使用)
  192. })
  193. if __name__ == "__main__":
  194. # 如果此服务器为其自身的交易管理 global_nonce_USER_WALLET,则在此处初始化
  195. logging.info("启动 Flask 套利执行服务器...")
  196. app.run(host='0.0.0.0', port=188, debug=False) # 使用与 price_checker 不同的端口