as.py 16 KB


  1. # -*- coding: utf-8 -*-
  2. from checker.logger_config import get_logger
  3. logger = get_logger('as')
  4. logger.info('\n\n----------------------------------------------as啓動--------------------------------------------')
  5. import threading
  6. import uuid # 用于生成唯一的流程ID
  7. import time
  8. import logging
  9. import s_erc20_to_mexc
  10. import s_mexc_to_erc20
  11. import web3_py_client
  12. import traceback
  13. import copy
  14. import sys
  15. import pandas as pd
  16. import io
  17. from decimal import Decimal, ROUND_DOWN
  18. from flask import Flask, request, jsonify, send_file
  19. from flask_cors import CORS # 导入
  20. from as_utils import get_formatted_timestamp
  21. from as_utils import add_state_flow_entry
  22. from binance.client import Client # 用于获取ETH价格
  23. from mexc_client import MexcClient
  24. from pprint import pprint
  25. from pprint import pformat
  26. from config import rpc_url
  27. # 配置日志
  28. log = logging.getLogger('werkzeug')
  29. log.setLevel(logging.ERROR)
  30. web3 = web3_py_client.EthClient(rpc_url)
  31. w3 = web3.w3
  32. mexc = MexcClient()
  33. # 该代币最后一次执行套利的区块信息 (如果需要防止过于频繁的同类套利,不然变成砸盘、拉盘的了)
  34. last_process_info = {} # 示例: {"RATO_USDT": 0}
  35. MIN_BLOCKS_BETWEEN_ARB = Decimal(1) # 在重试相同交易对之前等待几个区块
  36. # --- 全局状态和锁 ---
  37. processing_list = [] # 正在处理的任务列表
  38. history_process_list = [] # 已完成的任务历史列表
  39. list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_list 结构的锁
  40. # --- 一些核心數據和鎖 ---
  41. core_data = {
  42. "eth_price": 0, # 全局 eth價格
  43. "block_number": 0, # 全局 區塊號
  44. "block": None, # 全局 最后一個區塊的信息
  45. }
  46. core_lock = threading.Lock() # 核心數據的锁
  47. # --- mexc相關數據和鎖 ---
  48. mexc_data = {
  49. "account_info": {},
  50. "coin_info_map": {}, # 處理過的幣種信息,coin_info_map[coin][network]
  51. }
  52. mexc_lock = threading.Lock()
  53. CHAIN_ID = -1
  54. try:
  55. if w3.provider:
  56. CHAIN_ID = w3.eth.chain_id
  57. logger.info(f"Web3 已连接。chain_id={CHAIN_ID}")
  58. else:
  59. logger.info("Web3 未连接。")
  60. except Exception as e:
  61. logger.info(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
  62. # Binance 客户端 (无需API Key/Secret即可获取公开行情数据)
  63. try:
  64. binance_client = Client()
  65. # 测试连接 (可选,但建议)
  66. binance_client.ping()
  67. logger.info("成功连接到 Binance API。")
  68. except Exception as e:
  69. logger.error(f"初始化 Binance Client 时发生错误: {e}")
  70. binance_client = None
  71. # --- Flask 应用 ---
  72. app = Flask(__name__)
  73. CORS(app) # 在创建 app 实例后启用 CORS
  74. def move_completed_process_to_history(process_id_to_move: str) -> bool:
  75. """
  76. 将一个完成的 process_item 从 processing_list 移动到 history_process_list。
  77. 此操作是线程安全的。
  78. Args:
  79. process_id_to_move (str): 要移动的 process_item 的 ID。
  80. Returns:
  81. bool: 如果成功找到并移动了 item,则返回 True,否则返回 False。
  82. """
  83. global processing_list, history_process_list # 因为我们要修改这两个列表
  84. item_to_move = None
  85. moved_successfully = False
  86. with list_lock:
  87. # 查找并从 processing_list 中移除
  88. found_index = -1
  89. for i, item in enumerate(processing_list):
  90. if item.get('id') == process_id_to_move:
  91. found_index = i
  92. break
  93. if found_index != -1:
  94. item_to_move = processing_list.pop(found_index) # 从 processing_list 中移除并获取它
  95. # 假设在 item_to_move 中,其 currentState 已经被 arbitrage_process_flow 更新为 COMPLETED 或 FAILED
  96. # arbitrage_process.add_state_flow_entry(item_to_move, "MOVED_TO_HISTORY", f"流程处理完毕,移至历史记录。最终状态: {item_to_move.get('currentState', 'N/A')}", "info")
  97. if item_to_move['currentState'] in ['COMPLETED', 'FAILED']:
  98. history_process_list.append(item_to_move) # 添加到 history_process_list
  99. logger.info(f"已将 process_id: {process_id_to_move} 从 processing_list 移动到 history_process_list。")
  100. moved_successfully = True
  101. else:
  102. logger.warning(f"尝试移动到 history_list 时,在 processing_list 中未找到 process_id: {process_id_to_move}")
  103. return moved_successfully
  104. # 策略構建器
  105. def strategy_builder(process_item):
  106. strategy = process_item['strategy']
  107. global core_data
  108. global core_lock
  109. global mexc_data
  110. global mexc_lock
  111. process_item_formated = pformat(process_item, indent=2)
  112. logger.info(f'策略原始参数:\n{process_item_formated}')
  113. if strategy == 'erc20_to_mexc':
  114. return s_erc20_to_mexc.ArbitrageProcess(process_item,
  115. core_data, core_lock,
  116. mexc_data, mexc_lock
  117. )
  118. # elif strategy == 'mexc_to_erc20':
  119. # return s_mexc_to_erc20.ArbitrageProcess(process_item,
  120. # core_data, core_lock,
  121. # mexc_data, mexc_lock
  122. # )
  123. else:
  124. logger.error(f'不存在的策略:{strategy}')
  125. # 實際套利邏輯
  126. def arbitrage_process_flow(process_item):
  127. """
  128. 在单独线程中执行的实际套利逻辑。
  129. 会直接修改 'process_item' 字典。
  130. """
  131. process_id = process_item['id']
  132. ap = strategy_builder(process_item)
  133. # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
  134. ap._set_state(ap.STATE_CHECK)
  135. # 在主循环中周期性调用 run_arbitrage_step
  136. while ap.current_state != ap.STATE_COMPLETED and ap.current_state != ap.STATE_FAILED and ap.current_state != ap.STATE_REJECT:
  137. ap.run_arbitrage_step()
  138. ap.run_arbitrage_step()
  139. move_completed_process_to_history(process_id)
  140. # --- 核心數據更新綫程函數 ---
  141. def update_core_data_periodically():
  142. """
  143. 周期性更新 nonce 和 ETH 价格的线程函数。
  144. """
  145. global core_data # 明确表示我们要修改全局的 core_data
  146. while True:
  147. try:
  148. new_eth_price = None
  149. new_eth_balance = None
  150. new_nonce = None
  151. new_block_number = None
  152. new_block = None
  153. # 1. 从 Binance 获取 ETH 价格
  154. if binance_client:
  155. try:
  156. ticker = binance_client.get_symbol_ticker(symbol="ETHUSDT")
  157. new_eth_price = float(ticker['price'])
  158. except Exception as e:
  159. logger.error(f"从 Binance 获取 ETH 价格失败: {e}")
  160. else:
  161. logger.warning("Binance client 未初始化, 无法获取 ETH 价格。")
  162. # 2. 获取最新的block_number
  163. # 确保 w3 已初始化且 USER_WALLET 已配置
  164. if w3 and w3.is_connected():
  165. try:
  166. new_block = w3.eth.get_block('latest')
  167. new_block_number = new_block['number']
  168. except Exception as e:
  169. logger.error(f"获取 BlockNumber 失败: {e}")
  170. elif not (w3 and w3.is_connected()):
  171. logger.warning("Web3 未连接, 无法获取 BlockNumber。")
  172. # 3. 更新共享数据 core_data (使用锁)
  173. # 只有当获取到新数据时才更新,避免不必要的写操作和日志
  174. with core_lock:
  175. if new_eth_price is not None and core_data["eth_price"] != new_eth_price:
  176. eth_price = Decimal(new_eth_price)
  177. eth_price = eth_price.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  178. core_data["eth_price"] = eth_price
  179. # 判斷block_number是否發生變化(升高)
  180. if new_block_number is not None and new_block_number > core_data["block_number"]:
  181. core_data["block_number"] = new_block_number
  182. core_data["block"] = new_block
  183. # 區塊變了才刷新nonce,否則還是要靠本地的緩存維護
  184. if new_nonce is not None and core_data["nonce"] != new_nonce:
  185. core_data["nonce"] = new_nonce
  186. # 餘額也同理
  187. if new_eth_balance is not None and core_data["eth_balance"] != new_eth_balance:
  188. core_data["eth_balance"] = new_eth_balance
  189. # logger.info(f"核心数据已更新: ETH Price = {core_data['eth_price']}, Nonce ({USER_WALLET}) = {core_data['nonce']}, EthBalance={core_data['eth_balance']}, BlockNumber = {core_data['block_number']}")
  190. except Exception as e:
  191. # 捕获线程循环中的其他潜在错误
  192. exc_traceback = traceback.format_exc()
  193. logger.error(f"数据更新线程发生未知错误\n{exc_traceback}")
  194. # traceback.print_exc()
  195. # 等待 500ms
  196. time.sleep(0.5)
  197. # --- mexc數據更新綫程函數 ---
  198. def update_mexc_data_periodically():
  199. """
  200. 周期性更新 mexc的相關數據 的线程函数。
  201. """
  202. global mexc_data
  203. # 每60秒獲取一次coin_info
  204. coin_info_get_delay = 60
  205. while True:
  206. try:
  207. new_account_info = None
  208. new_coin_info_list = None
  209. # 1. new_account_info
  210. try:
  211. new_account_info = mexc.trade.get_account_info()
  212. if 'balances' not in new_account_info:
  213. raise Exception("未找到balances")
  214. with mexc_lock:
  215. mexc_data['account_info'] = new_account_info
  216. # logger.info(f'account_info: {new_account_info['balances']}')
  217. except Exception as e:
  218. logger.error(f"从 Mexc 获取 Balance 失败: {e}, {new_account_info}")
  219. # 2. new_coin_info list
  220. try:
  221. if coin_info_get_delay >= 60:
  222. coin_info_get_delay = 0
  223. new_coin_info_list = mexc.wallet.get_coinlist()
  224. if not isinstance(new_coin_info_list, list):
  225. raise Exception("幣種信息獲取錯誤")
  226. # 處理幣種信息
  227. new_coin_info_map = {}
  228. for coin_info in new_coin_info_list:
  229. new_coin_info_map[coin_info['coin']] = {}
  230. for network in coin_info['networkList']:
  231. new_coin_info_map[coin_info['coin']][network['netWork']] = network
  232. with mexc_lock:
  233. mexc_data['coin_info_map'] = new_coin_info_map
  234. # logger.info(f'coin_info_map: {new_coin_info_map['USDT']}')
  235. except Exception as e:
  236. logger.error(f"从 Mexc 获取 coinlist 失败: {e}, {new_coin_info_list}")
  237. except Exception as e:
  238. # 捕获线程循环中的其他潜在错误
  239. exc_traceback = traceback.format_exc()
  240. logger.error(f"数据更新线程发生未知错误\n{exc_traceback}")
  241. # traceback.print_exc()
  242. # 幣種信息處理的delay
  243. coin_info_get_delay = coin_info_get_delay + 1
  244. # 等待 1s
  245. time.sleep(1)
  246. @app.route('/submit_process', methods=['POST'])
  247. def handle_submit_process():
  248. data = request.get_json()
  249. if not data:
  250. return jsonify({"error": "无效的 JSON 请求体"}), 400
  251. required_fields = ['symbol', 'strategy']
  252. for field in required_fields:
  253. if field not in data:
  254. return jsonify({"error": f"缺少字段: {field}, keys: {data.keys()}"}), 400
  255. symbol = data['symbol'] # 交易对符号
  256. # 检查此交易对此区块是否处理过
  257. last_trade_block = last_process_info.get(symbol)
  258. with core_lock:
  259. current_block = core_data['block_number']
  260. if last_trade_block:
  261. if current_block - last_trade_block < MIN_BLOCKS_BETWEEN_ARB:
  262. return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_trade_block}). 当前区块 {current_block}."}), 200
  263. process_id = str(uuid.uuid4()) # 生成唯一流程ID
  264. process_item = copy.deepcopy(data)
  265. process_item['id'] = process_id
  266. process_item['profit'] = Decimal(0)
  267. process_item['creationTime'] = get_formatted_timestamp(), # 创建时间
  268. process_item['stateFlow'] = [] # 状态流转记录
  269. process_item['currentState'] = "PENDING_START"
  270. # 初始状态更新
  271. add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。开始套利。", "success")
  272. with list_lock:
  273. processing_list.append(process_item)
  274. last_process_info[symbol] = current_block
  275. logger.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
  276. # 在新线程中开始套利过程
  277. arb_thread = threading.Thread(target=arbitrage_process_flow, args=(process_item,), daemon=True)
  278. arb_thread.start()
  279. return jsonify({"message": "套利过程已启动", "process_id": process_id}), 201
  280. @app.route('/processing', methods=['GET'])
  281. def get_processing_list():
  282. """获取正在处理的任务列表"""
  283. with list_lock:
  284. # 返回一个副本,以避免在迭代生成 JSON 响应时列表被修改的问题
  285. return jsonify(list(processing_list))
  286. @app.route('/history', methods=['GET'])
  287. def get_history_list():
  288. """获取已完成的任务历史列表"""
  289. with list_lock:
  290. return jsonify(list(history_process_list))
  291. @app.route('/status', methods=['GET'])
  292. def get_status():
  293. """获取系统状态概览"""
  294. with list_lock:
  295. return jsonify({
  296. "processing_count": len(processing_list), # 正在处理的任务数量
  297. "history_count": len(history_process_list), # 历史任务数量
  298. # "current_nonce_USER_WALLET_if_managed_here": global_nonce_USER_WALLET, # 示例:如果服务器管理此nonce
  299. "last_process_info": last_process_info # 最后处理信息 (如果使用)
  300. })
  301. @app.route('/download_history_excel', methods=['GET'])
  302. def download_history_excel():
  303. """下载历史记录的 Excel 文件"""
  304. with list_lock:
  305. # 为了线程安全,复制一份列表进行处理
  306. history_data = list(history_process_list)
  307. if not history_data:
  308. # 如果没有数据,可以返回一个空文件或一个提示信息
  309. return "没有历史记录可以下载。", 404
  310. # 1. 使用 pandas 将列表数据转换成 DataFrame
  311. # 假设您的列表中的每个元素都是一个字典,例如:
  312. # {'交易对': 'DORKY_USDT', '利润': 1.23, '创建时间': '...'}
  313. df = pd.DataFrame(history_data)
  314. # (可选) 整理列的顺序或重命名,让 Excel 更美观
  315. # df = df[['创建时间', '交易对', '利润', '最终状态']]
  316. # df.rename(columns={'创建时间': '交易发生时间'}, inplace=True)
  317. # 2. 在内存中创建一个 Excel 文件
  318. output = io.BytesIO()
  319. # 使用 to_excel 方法,并用 openpyxl 引擎
  320. # index=False 表示不把 DataFrame 的索引写入 Excel
  321. with pd.ExcelWriter(output, engine='openpyxl') as writer:
  322. df.to_excel(writer, index=False, sheet_name='History')
  323. # 移动到二进制流的开头
  324. output.seek(0)
  325. # 3. 使用 send_file 将内存中的文件作为附件发送给浏览器
  326. return send_file(
  327. output,
  328. # attachment_filename 是浏览器下载时默认显示的文件名
  329. download_name='history_records.xlsx',
  330. # as_attachment=True 表示作为附件下载,而不是在浏览器中打开
  331. as_attachment=True,
  332. # mimetype 告诉浏览器这是一个 Excel 文件
  333. mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
  334. )
  335. if __name__ == "__main__":
  336. logger.info("启动核心数据更新线程...")
  337. updater_thread = threading.Thread(target=update_core_data_periodically, daemon=True)
  338. updater_thread.start()
  339. logger.info("启动抹茶数据更新线程...")
  340. updater_thread = threading.Thread(target=update_mexc_data_periodically, daemon=True)
  341. updater_thread.start()
  342. logger.info("主线程继续执行,可以执行其他任务或保持运行以观察数据更新。")
  343. logger.info("启动 Flask 套利执行服务器...")
  344. app.run(host='0.0.0.0', port=1888, debug=False) # 使用与 price_checker 不同的端口