as.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. # -*- coding: utf-8 -*-
  2. from checker.logger_config import get_logger
  3. logger = get_logger('as')
  4. logger.info('\n\n----------------------------------------------as啓動--------------------------------------------')
  5. import threading
  6. import uuid # 用于生成唯一的流程ID
  7. import time
  8. import logging
  9. import erc20_to_mexc
  10. import web3_py_client
  11. import traceback
  12. import copy
  13. import sys
  14. from decimal import Decimal, ROUND_DOWN
  15. from flask import Flask, request, jsonify
  16. from flask_cors import CORS # 导入
  17. from as_utils import get_formatted_timestamp
  18. from as_utils import add_state_flow_entry
  19. from config import wallet
  20. from binance.client import Client # 用于获取ETH价格
  21. from checker import ok_chain_client
  22. from pprint import pprint
  23. from pprint import pformat
  24. ok_chain_client.api_config = {
  25. "api_key": 'a05643ab-fb17-402b-94a8-a886bd343301', # 请替换为您的真实 API Key
  26. "secret_key": '9D59B53EB1E60B1B5F290D3698A8C9DA', # 请替换为您的真实 Secret Key
  27. "passphrase": 'Qwe123123.', # 请替换为您的真实 Passphrase
  28. }
  29. # 配置日志
  30. log = logging.getLogger('werkzeug')
  31. log.setLevel(logging.ERROR)
  32. web3 = web3_py_client.EthClient()
  33. w3 = web3.w3
  34. USER_WALLET = wallet["user_wallet"]
  35. USER_EXCHANGE_WALLET = wallet["user_exchange_wallet"]
  36. # 该代币最后一次执行套利的区块信息 (如果需要防止过于频繁的同类套利,不然变成砸盘、拉盘的了)
  37. last_process_info = {} # 示例: {"RATO_USDT": 0}
  38. MIN_BLOCKS_BETWEEN_ARB = Decimal(2) # 在重试相同交易对之前等待几个区块
  39. # --- 全局状态和锁 ---
  40. processing_list = [] # 正在处理的任务列表
  41. history_process_list = [] # 已完成的任务历史列表
  42. list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_list 结构的锁
  43. # --- 一些核心數據和鎖 ---
  44. core_data = {
  45. "nonce": 0, # 全局 Nonce
  46. "block_number": 0, # 全局 區塊號
  47. "eth_price": 0, # eth價格
  48. }
  49. core_lock = threading.Lock() # 核心數據的锁
  50. # --- pending數據和鎖 ---
  51. pending_data = {
  52. # # 數據結構的demo
  53. # "0xaf181bbbf5bf56d9204bd18cd25abd90e51890e848525e7788b410689c0c26a4": {
  54. # "block_number": 22570370, # 提交pending時的區塊,隔幾個區塊去獲取數據會更準確
  55. # "tx_details": None, # okapi解析的數據, None就是還沒有獲取到
  56. # "reponse": None, # okapi最後一次獲取到的數據
  57. # },
  58. }
  59. pending_lock = threading.Lock()
  60. PENDING_CONFIRM_BLOCK = 3 # 需要幾個區塊才進行確認
  61. CHAIN_ID = -1
  62. try:
  63. if w3.provider:
  64. CHAIN_ID = w3.eth.chain_id
  65. logger.info(f"Web3 已连接。chain_id={CHAIN_ID}")
  66. else:
  67. logger.info("Web3 未连接。")
  68. except Exception as e:
  69. logger.info(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
  70. # Binance 客户端 (无需API Key/Secret即可获取公开行情数据)
  71. try:
  72. binance_client = Client()
  73. # 测试连接 (可选,但建议)
  74. binance_client.ping()
  75. logger.info("成功连接到 Binance API。")
  76. except Exception as e:
  77. logger.error(f"初始化 Binance Client 时发生错误: {e}")
  78. binance_client = None
  79. # --- Flask 应用 ---
  80. app = Flask(__name__)
  81. CORS(app) # 在创建 app 实例后启用 CORS
  82. def move_completed_process_to_history(process_id_to_move: str) -> bool:
  83. """
  84. 将一个完成的 process_item 从 processing_list 移动到 history_process_list。
  85. 此操作是线程安全的。
  86. Args:
  87. process_id_to_move (str): 要移动的 process_item 的 ID。
  88. Returns:
  89. bool: 如果成功找到并移动了 item,则返回 True,否则返回 False。
  90. """
  91. global processing_list, history_process_list # 因为我们要修改这两个列表
  92. item_to_move = None
  93. moved_successfully = False
  94. with list_lock:
  95. # 查找并从 processing_list 中移除
  96. found_index = -1
  97. for i, item in enumerate(processing_list):
  98. if item.get('id') == process_id_to_move:
  99. found_index = i
  100. break
  101. if found_index != -1:
  102. item_to_move = processing_list.pop(found_index) # 从 processing_list 中移除并获取它
  103. # 假设在 item_to_move 中,其 currentState 已经被 arbitrage_process_flow 更新为 COMPLETED 或 FAILED
  104. # arbitrage_process.add_state_flow_entry(item_to_move, "MOVED_TO_HISTORY", f"流程处理完毕,移至历史记录。最终状态: {item_to_move.get('currentState', 'N/A')}", "info")
  105. history_process_list.append(item_to_move) # 添加到 history_process_list
  106. logger.info(f"已将 process_id: {process_id_to_move} 从 processing_list 移动到 history_process_list。")
  107. moved_successfully = True
  108. else:
  109. logger.warning(f"尝试移动到 history_list 时,在 processing_list 中未找到 process_id: {process_id_to_move}")
  110. return moved_successfully
  111. # 實際套利邏輯
  112. def arbitrage_process_flow(process_item):
  113. """
  114. 在单独线程中执行的实际套利逻辑。
  115. 会直接修改 'process_item' 字典。
  116. """
  117. process_id = process_item['id']
  118. SYMBOL = process_item['symbol']
  119. tx = process_item['tx'] # 预期包含 'rawTransaction' (原始交易)
  120. FROM_TOKEN = process_item['fromToken']
  121. TO_TOKEN = process_item['toToken']
  122. FROM_TOKEN_AMOUNT_HUMAM = process_item['fromTokenAmountHuman']
  123. TO_TOKEN_AMOUNT_HUMAM = process_item['toTokenAmountHuman']
  124. profit = Decimal(process_item['profit'])
  125. profitLimit = Decimal(process_item['profitLimit'])
  126. USER_EXCHANGE_WALLET = process_item['userExchangeWallet']
  127. USER_WALLET = process_item['userWallet']
  128. SYMBOL = process_item['symbol']
  129. EXCHANGE_OUT_AMOUNT = process_item['exchangeOutAmount']
  130. gas_limit_multiplier = 1
  131. gas_price_multiplier = 1
  132. if profit > Decimal(2) * profitLimit:
  133. gas_price_multiplier = 1.2
  134. elif profit > Decimal(5) * profitLimit:
  135. gas_price_multiplier = 1.5
  136. elif profit > Decimal(10) * profitLimit:
  137. gas_price_multiplier = 2
  138. global core_data
  139. global core_lock
  140. global pending_data
  141. global pending_lock
  142. ap = erc20_to_mexc.ArbitrageProcess(tx, gas_limit_multiplier, gas_price_multiplier, process_item,
  143. core_data, core_lock,
  144. pending_data, pending_lock,
  145. )
  146. # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
  147. ap._set_state(ap.STATE_CHECK)
  148. # 在主循环中周期性调用 run_arbitrage_step
  149. while ap.current_state != ap.STATE_COMPLETED and ap.current_state != ap.STATE_FAILED and ap.current_state != ap.STATE_REJECT:
  150. ap.run_arbitrage_step()
  151. if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
  152. time.sleep(10)
  153. ap.run_arbitrage_step()
  154. move_completed_process_to_history(process_id)
  155. # --- 核心數據更新綫程函數 ---
  156. def update_core_data_periodically():
  157. """
  158. 周期性更新 nonce 和 ETH 价格的线程函数。
  159. """
  160. global core_data # 明确表示我们要修改全局的 core_data
  161. if not USER_WALLET or USER_WALLET == "你的钱包地址":
  162. logger.error("USER_WALLET 未正确配置。nonce 更新将无法进行。")
  163. # 如果 USER_WALLET 未配置,可以考虑让线程不执行 nonce 更新,或者直接退出
  164. # 这里我们选择继续运行,但 nonce 不会被更新
  165. while True:
  166. try:
  167. new_eth_price = None
  168. new_nonce = None
  169. new_block_number = None
  170. # 1. 从 Binance 获取 ETH 价格
  171. if binance_client:
  172. try:
  173. ticker = binance_client.get_symbol_ticker(symbol="ETHUSDT")
  174. new_eth_price = float(ticker['price'])
  175. except Exception as e:
  176. logger.error(f"从 Binance 获取 ETH 价格失败: {e}")
  177. else:
  178. logger.warning("Binance client 未初始化, 无法获取 ETH 价格。")
  179. # 2. 获取最新的 Nonce 和 最新的block_number
  180. # 确保 w3 已初始化且 USER_WALLET 已配置
  181. if w3 and w3.is_connected() and USER_WALLET and USER_WALLET != "你的钱包地址":
  182. try:
  183. new_block_number = w3.eth.block_number
  184. new_nonce = w3.eth.get_transaction_count(USER_WALLET, 'latest')
  185. except Exception as e:
  186. logger.error(f"为 {USER_WALLET} 获取 Nonce 失败: {e}")
  187. elif not (w3 and w3.is_connected()):
  188. logger.warning("Web3 未连接, 无法获取 nonce。")
  189. elif not (USER_WALLET and USER_WALLET != "你的钱包地址"):
  190. logger.warning("USER_WALLET 配置不正确, 无法获取 nonce。")
  191. # 3. 更新共享数据 core_data (使用锁)
  192. # 只有当获取到新数据时才更新,避免不必要的写操作和日志
  193. with core_lock:
  194. if new_eth_price is not None and core_data["eth_price"] != new_eth_price:
  195. eth_price = Decimal(new_eth_price)
  196. eth_price = eth_price.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  197. core_data["eth_price"] = eth_price
  198. # 判斷block_number是否發生變化(升高)
  199. if new_block_number is not None and new_block_number > core_data["block_number"]:
  200. core_data["block_number"] = new_block_number
  201. # 區塊變了才刷新nonce,否則還是要靠本地的緩存維護
  202. if new_nonce is not None and core_data["nonce"] != new_nonce:
  203. core_data["nonce"] = new_nonce
  204. # logger.info(f"核心数据已更新: ETH Price = {core_data['eth_price']}, Nonce ({USER_WALLET}) = {core_data['nonce']}, BlockNumber = {core_data['block_number']}")
  205. except Exception as e:
  206. # 捕获线程循环中的其他潜在错误
  207. logger.error(f"数据更新线程发生未知错误: {e}")
  208. traceback.print_exc()
  209. # 等待 500ms
  210. time.sleep(0.5)
  211. # --- tx pending數據獲取綫程函數 ---
  212. def update_tx_data():
  213. """
  214. 每一秒獲取一條tx數據
  215. """
  216. global pending_data # 明确表示我们要修改全局的 pending_data
  217. while True:
  218. # 等待1s
  219. time.sleep(1)
  220. try:
  221. # 使用拷貝后的數據,否則可能會出現綫程問題
  222. with pending_lock:
  223. pending_data_copy = copy.deepcopy(pending_data)
  224. # 核心數據同理
  225. with core_lock:
  226. core_data_copy = copy.deepcopy(core_data)
  227. block_number = core_data_copy['block_number']
  228. for tx in pending_data_copy:
  229. try:
  230. # 已獲取的就不要再獲取了
  231. if pending_data_copy[tx]['tx_details'] is not None:
  232. continue
  233. # PENDING_CONFIRM_BLOCK個區塊之後的才進行確認,防止回滾頻繁觸發
  234. if block_number < pending_data_copy[tx]['block_number'] + PENDING_CONFIRM_BLOCK:
  235. continue
  236. # 調用ok的api,直接獲取詳細交易
  237. ok_rst = ok_chain_client.history(CHAIN_ID, tx)
  238. # 存儲最後一次獲取的細節
  239. with pending_lock:
  240. pending_data[tx]['response'] = ok_rst
  241. # 錯誤響應
  242. if ok_rst['code'] != '0':
  243. raise RuntimeError("API 返回错误响应", ok_rst)
  244. # ok不一定那麽快獲取到
  245. if ok_rst['data'] is None:
  246. # 每一個之間等待1s
  247. time.sleep(1)
  248. continue
  249. details = ok_rst['data']
  250. # 有時候不會馬上識別出成交數量
  251. if details['fromTokenDetails'] is None or details['toTokenDetails']:
  252. # 每一個之間等待1s
  253. time.sleep(1)
  254. continue
  255. # 成功獲取之後直接調用更新
  256. with pending_lock:
  257. pending_data[tx]['tx_details'] = details
  258. formated_data = pformat(ok_rst['data'], indent=2) # indent=2 让格式更整齐
  259. logger.info(f"獲取成功: \n{formated_data}")
  260. except Exception as e:
  261. logger.error(f"tx數據獲取失敗: {e}")
  262. traceback.print_exc()
  263. # 每一個之間等待1s
  264. time.sleep(1)
  265. except Exception as e:
  266. logger.error(f"pending更新线程发生未知错误: {e}")
  267. traceback.print_exc()
  268. @app.route('/submit_process', methods=['POST'])
  269. def handle_submit_process():
  270. data = request.get_json()
  271. if not data:
  272. return jsonify({"error": "无效的 JSON 请求体"}), 400
  273. required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'fromToken', 'fromTokenAmountHuman', 'fromTokenDecimal', 'toToken', 'toTokenAmountHuman', 'exchangeOutAmount']
  274. for field in required_fields:
  275. if field not in data:
  276. return jsonify({"error": f"缺少字段: {field}"}), 400
  277. try:
  278. profit = Decimal(str(data['profit'])) # 利润
  279. profit_limit = Decimal(str(data['profitLimit'])) # 利润阈值
  280. from_token_amount_human = Decimal(str(data['fromTokenAmountHuman'])) # fromToken 的人类可读数量
  281. from_token_decimal = Decimal(str(data['fromTokenDecimal'])) # fromToken 的小数位数
  282. to_token_amount_human = Decimal(str(data['toTokenAmountHuman'])) # toToken 的人类可读数量
  283. to_token_decimal = Decimal(str(data['toTokenDecimal'])) # fromToken 的小数位数
  284. exchange_out_amount = Decimal(str(data['exchangeOutAmount'])) # 交易所需要卖出的数量
  285. except (decimal.InvalidOperation, ValueError) as e:
  286. return jsonify({"error": f"请求体中包含无效的小数/整数值: {e}"}), 400
  287. symbol = data['symbol'] # 交易对符号
  288. # 检查此交易对此区块是否处理过
  289. last_trade_block = last_process_info.get(symbol)
  290. with core_lock:
  291. current_block = core_data['block_number']
  292. if last_trade_block:
  293. if current_block - last_trade_block < MIN_BLOCKS_BETWEEN_ARB:
  294. return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_trade_block}). 当前区块 {current_block}."}), 200
  295. if profit >= profit_limit:
  296. process_id = str(uuid.uuid4()) # 生成唯一流程ID
  297. process_item = {
  298. "id": process_id,
  299. "creationTime": get_formatted_timestamp(), # 创建时间
  300. "tx": data['tx'], # 交易详情,应包含 rawTransaction
  301. "profit": str(profit), # 利润 (字符串存储)
  302. "profitLimit": str(profit_limit), # 利润阈值 (字符串存储)
  303. "symbol": symbol, # 交易对
  304. "userWallet": USER_WALLET,
  305. "userExchangeWallet": USER_EXCHANGE_WALLET,
  306. "fromToken": data['fromToken'], # 起始代币
  307. "fromTokenAmountHuman": str(from_token_amount_human), # 起始代币数量 (人类可读, 字符串存储)
  308. "fromTokenDecimal": str(from_token_decimal), # 起始代币小数位数
  309. "toTokenAmountHuman": str(to_token_amount_human),
  310. "toTokenDecimal": str(to_token_decimal),
  311. "exchangeOutAmount": str(exchange_out_amount),
  312. "toToken": data['toToken'], # 目标代币
  313. "stateFlow": [], # 状态流转记录
  314. "currentState": "PENDING_START", # 当前状态
  315. }
  316. # 初始状态更新
  317. add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。利润 {profit} >= 利润阈值 {profit_limit}。开始套利。", "success")
  318. with list_lock:
  319. processing_list.append(process_item)
  320. last_process_info[symbol] = current_block
  321. logger.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
  322. # 在新线程中开始套利过程
  323. arb_thread = threading.Thread(target=arbitrage_process_flow, args=(process_item,), daemon=True)
  324. arb_thread.start()
  325. return jsonify({"message": "套利过程已启动", "process_id": process_id}), 201
  326. else:
  327. return jsonify({"message": f"利润 {profit} 小于利润阈值 {profit_limit}。不处理。"}), 200
  328. @app.route('/processing', methods=['GET'])
  329. def get_processing_list():
  330. """获取正在处理的任务列表"""
  331. with list_lock:
  332. # 返回一个副本,以避免在迭代生成 JSON 响应时列表被修改的问题
  333. return jsonify(list(processing_list))
  334. @app.route('/history', methods=['GET'])
  335. def get_history_list():
  336. """获取已完成的任务历史列表"""
  337. with list_lock:
  338. return jsonify(list(history_process_list))
  339. @app.route('/status', methods=['GET'])
  340. def get_status():
  341. """获取系统状态概览"""
  342. with list_lock:
  343. return jsonify({
  344. "processing_count": len(processing_list), # 正在处理的任务数量
  345. "history_count": len(history_process_list), # 历史任务数量
  346. # "current_nonce_USER_WALLET_if_managed_here": global_nonce_USER_WALLET, # 示例:如果服务器管理此nonce
  347. "last_process_info": last_process_info # 最后处理信息 (如果使用)
  348. })
  349. if __name__ == "__main__":
  350. logger.info("启动核心数据更新线程...")
  351. updater_thread = threading.Thread(target=update_core_data_periodically, daemon=True)
  352. updater_thread.start()
  353. logger.info("启动pending信息獲取线程...")
  354. pending_thread = threading.Thread(target=update_tx_data, daemon=True)
  355. pending_thread.start()
  356. logger.info("主线程继续执行,可以执行其他任务或保持运行以观察数据更新。")
  357. logger.info("启动 Flask 套利执行服务器...")
  358. app.run(host='0.0.0.0', port=188, debug=False) # 使用与 price_checker 不同的端口