erc20_to_mexc_checker_ws.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. import requests
  2. import time
  3. import threading
  4. import json
  5. import logging
  6. import ok_chain_client # 假设这是你的 OKX Chain 客户端库
  7. import pprint
  8. import plotly.graph_objects as go
  9. import argparse
  10. import signal
  11. import traceback
  12. import copy
  13. from pprint import pformat
  14. from decimal import Decimal, ROUND_DOWN
  15. from mexc_ws_client import MexcWSClient
  16. from typing import Callable, Any, Dict, List, Tuple
  17. from flask import Flask, render_template, jsonify
  18. from collections import deque
  19. from plotly.utils import PlotlyJSONEncoder
  20. # configs
  21. from config import wallet
  22. from config import okchain_api
  23. from config import arb
  24. # 排序及深度處理
  25. import bisect
  26. from typing import List
  27. # logs
  28. from logger_config import get_logger
  29. logger = get_logger('as')
  30. # ok web3的配置
  31. ok_chain_client.api_config = okchain_api # 假设ok_chain_client有此配置方式
  32. # --- 配置 arb_executor.py 的 HTTP 地址和端口 ---
  33. ARB_EXECUTOR_URL = arb["ARB_EXECUTOR_URL"]
  34. # --- 配置部分 ---
  35. # IN_AMOUNT_TO_QUERY 将在循环中动态确定
  36. EXCHANGE_OUT_AMOUNT = Decimal(str(arb["EXCHANGE_OUT_AMOUNT"])) # 确保是Decimal
  37. PROFIT_LIMIT = Decimal(str(arb["PROFIT_LIMIT"])) # 确保是Decimal
  38. IN_TOKEN_ADDRESS = arb["IN_TOKEN_ADDRESS"]
  39. IN_TOKEN_DECIMALS = arb["IN_TOKEN_DECIMALS"]
  40. OUT_TOKEN_ADDRESS = arb["OUT_TOKEN_ADDRESS"]
  41. SLIPPAGE = arb["SLIPPAGE"]
  42. MEXC_TARGET_PAIR_USDT = arb["MEXC_TARGET_PAIR_USDT"]
  43. CHAIN_ID = arb["CHAIN_ID"]
  44. # 錢包的配置
  45. USER_WALLET = wallet["user_wallet"]
  46. USER_EXCHANGE_WALLET = wallet["user_exchange_wallet"]
  47. proxies = None # {'http': 'http://proxy_url:port', 'https': 'http://proxy_url:port'}
  48. # 運行模式【trade、view】
  49. mode = None
  50. # oo_price_usdt_per_target = None # 这个全局变量似乎没有被有效使用,价格在循环内获取
  51. # 配置請求的日志等級
  52. app = Flask(__name__)
  53. log = logging.getLogger('werkzeug')
  54. log.setLevel(logging.ERROR)
  55. REFRESH_INTERVAL_SECONDS = 1 # 稍微增加间隔以减少API调用频率
  56. MAX_HISTORY_POINTS_PLOTLY = 21600
  57. historical_data_points = deque(maxlen=MAX_HISTORY_POINTS_PLOTLY)
  58. TARGET_ASSET_SYMBOL = MEXC_TARGET_PAIR_USDT.split('_')[0] # e.g., RATO
  59. BASE_CURRENCY_SYMBOL = MEXC_TARGET_PAIR_USDT.split('_')[1] # e.g., USDT (assumed to be consistent with IN_TOKEN_ADDRESS)
  60. # --- 全局深度数据和锁 ---
  61. # 初始化一个空的深度结构
  62. depth: Dict[str, Any] = {
  63. "lastUpdateId": None, # 可以考虑用事件ID或时间戳更新
  64. "bids": [], # 列表中的每个元素是 [price_str, volume_str]
  65. "asks": [] # 列表中的每个元素是 [price_str, volume_str]
  66. }
  67. depth_lock = threading.Lock() # 用于保护对 depth 数据的并发访问
  68. # 對於每次深度數據更新,都至少要更新一下該變量
  69. in_amount_to_query_human = Decimal(300)
  70. in_amount_lock = threading.Lock()
  71. # 鏈上數據的構造鎖結構
  72. chain_data = {}
  73. chain_data_lock = threading.Lock()
  74. # 圖表數據
  75. latest_values_for_table = {
  76. f"oo_price_usdt_per_target": "N/A",
  77. f"mexc_price_target_per_base": "N/A", # MEXC Bid1 (converted to USDT/TARGET)
  78. f"diff_oo_vs_mexc_bid1_percentage": "N/A",
  79. "profit_value_for_table": "N/A", # 新增:用于表格的利润值
  80. "oo_error": None, "mexc_error": None,
  81. "last_updated": "N/A",
  82. "mexc_pair_usdt_for_display": MEXC_TARGET_PAIR_USDT,
  83. "target_asset_symbol_for_display": TARGET_ASSET_SYMBOL,
  84. "base_currency_symbol_for_display": BASE_CURRENCY_SYMBOL
  85. }
  86. data_lock = threading.Lock()
  87. # 1秒一次添加數據到圖表
  88. last_insert_time = 0
  89. last_insert_time_lock = threading.Lock()
  90. # --- 兩個綫程,分別獲取depth以及鏈上數據
  91. def update_depth_data_periodically():
  92. global depth
  93. while True:
  94. # 等待 1 分鐘
  95. time.sleep(60)
  96. try:
  97. new_depth = get_depth(MEXC_TARGET_PAIR_USDT)
  98. if 'bids' in new_depth and new_depth['bids']:
  99. with depth_lock:
  100. depth = new_depth
  101. else:
  102. logger.error(f"depth獲取失敗: {new_depth}")
  103. except Exception as e:
  104. # 捕获线程循环中的其他潜在错误
  105. logger.error(f"depth数据更新线程发生未知错误: {e}")
  106. traceback.print_exc()
  107. def update_chain_data_periodically():
  108. global chain_data
  109. chain_id = CHAIN_ID
  110. in_token_addr = IN_TOKEN_ADDRESS
  111. out_token_addr = OUT_TOKEN_ADDRESS
  112. in_token_decimals = IN_TOKEN_DECIMALS
  113. slippage = SLIPPAGE
  114. user_wallet_addr = USER_WALLET
  115. user_exchange_wallet_addr = USER_EXCHANGE_WALLET
  116. while True:
  117. try:
  118. with in_amount_lock:
  119. amount_in_base_human = Decimal(str(in_amount_to_query_human))
  120. in_token_amount_atomic = int(amount_in_base_human * (10 ** in_token_decimals)) # 转为原子单位整数
  121. data = ok_chain_client.swap(chain_id, in_token_amount_atomic, in_token_addr, out_token_addr, slippage, user_wallet_addr, user_exchange_wallet_addr, 'fast')
  122. if data.get('code') == '0' and data.get('data'):
  123. with chain_data_lock:
  124. chain_data = data
  125. else:
  126. logger.info(f"鏈上數據獲取錯誤:{data}")
  127. except Exception as e:
  128. # 捕获线程循环中的其他潜在错误
  129. logger.error(f"鏈上数据更新线程发生未知错误: {e}")
  130. traceback.print_exc()
  131. # 等待 0.1 秒 測試
  132. time.sleep(0.1)
  133. # --- 深度维护逻辑 ---
  134. def update_level(levels_list: List[List[str]], price_str: str, volume_str: str, is_bids: bool):
  135. """
  136. 高效更新单个订单簿边(bids 或 asks)的特定价格水平。
  137. Args:
  138. levels_list: 当前的 bids 或 asks 列表,已按价格排序(bids 降序,asks 升序)。
  139. price_str: 新价格 (字符串)。
  140. volume_str: 新数量 (字符串)。
  141. is_bids: 布尔值,True 表示 bids (按价格降序),False 表示 asks (按价格升序)。
  142. """
  143. price_to_update = float(price_str)
  144. volume_to_update = float(volume_str)
  145. new_level_entry = [price_str, volume_str]
  146. # 1. 尝试查找现有的价格水平 (线性扫描,但对于深度有限的列表通常够快)
  147. # 如果深度非常大 (几百上千档),则二分查找会更好。
  148. found_at_index = -1
  149. for i, level_data in enumerate(levels_list):
  150. if float(level_data[0]) == price_to_update:
  151. found_at_index = i
  152. break
  153. # 2. 如果数量为0,则移除
  154. if volume_to_update == 0.0:
  155. if found_at_index != -1:
  156. levels_list.pop(found_at_index)
  157. # 3. 如果数量不为0
  158. else:
  159. if found_at_index != -1: # 价格已存在,更新数量
  160. levels_list[found_at_index][1] = volume_str
  161. else: # 价格不存在,需要插入以保持排序
  162. if is_bids:
  163. # Bids: 价格降序 (高价在前)
  164. # 找到插入点:第一个使得 level_price <= price_to_update 的位置
  165. insert_idx = 0
  166. while insert_idx < len(levels_list) and float(levels_list[insert_idx][0]) > price_to_update:
  167. insert_idx += 1
  168. levels_list.insert(insert_idx, new_level_entry)
  169. else:
  170. # Asks: 价格升序 (低价在前)
  171. # 找到插入点:第一个使得 level_price >= price_to_update 的位置
  172. # 使用 bisect.bisect_left 获取 prices (只包含价格的列表) 的插入点
  173. # 如果不创建临时 prices 列表,也可以直接迭代:
  174. insert_idx = 0
  175. while insert_idx < len(levels_list) and float(levels_list[insert_idx][0]) < price_to_update:
  176. insert_idx += 1
  177. levels_list.insert(insert_idx, new_level_entry)
  178. def update_depth_from_incremental(message_data: Dict[str, Any]):
  179. """
  180. 根据增量深度消息更新全局 depth 结构。
  181. Args:
  182. message_data (Dict[str, Any]): 从 WebSocket 收到的已解析的JSON数据,
  183. 期望包含 'c' (channel) 和 'd' (data) 字段。
  184. 'd' 应该包含 'bids' 和/或 'asks' 以及 'r' (推流ID)。
  185. """
  186. global depth, depth_lock
  187. # 检查是否是我们期望的增量深度消息
  188. channel = message_data.get('c')
  189. if not channel or "spot@public.increase.depth.v3.api" not in channel:
  190. # print(f"[DEPTH_UPDATE] Received non-depth message or unexpected channel: {channel}")
  191. return
  192. data_part = message_data.get('d')
  193. if not data_part or not isinstance(data_part, dict):
  194. print(f"[DEPTH_UPDATE] Incremental depth message 'd' part is missing or not a dict: {message_data}")
  195. return
  196. # 获取推流 ID,用于更新 lastUpdateId (如果需要严格同步,这里可能需要更复杂的逻辑)
  197. # r = message_data['d'].get('r') # 这个 'r' 是MEXC的推流编号, 不是UpdateID
  198. # 对于MEXC,增量深度通常不直接提供严格意义上的 Binance 那样的 U (firstUpdateId) 和 u (finalUpdateId)
  199. # 如果需要严格校对,通常需要先拉取一次全量快照,然后基于快照的 eventId/versionId 来应用增量
  200. # 这里我们简单地用最新的推流ID r (如果可用) 或者 时间戳 来标记更新
  201. # MEXC的 `spot@public.increase.depth.v3.api` 返回的 `r` 字段是唯一的推流ID。
  202. stream_id = data_part.get('r') # 这个r是推流ID,可以用它来粗略判断顺序
  203. # 更新 lastUpdateId (用推流ID或时间戳)
  204. if stream_id:
  205. # 注意:如果 depth['lastUpdateId'] 之前是全量快照的 eventId,
  206. # 那么这里的推流ID r 的序列性需要搞清楚。
  207. # 如果只是简单地取最大值,那也可以。
  208. with depth_lock:
  209. if depth.get("lastUpdateId") is None or int(stream_id) > int(depth.get("lastUpdateId", 0)):
  210. depth["lastUpdateId"] = stream_id
  211. else:
  212. with depth_lock:
  213. depth["lastUpdateId"] = int(time.time() * 1000) # 或者使用时间戳
  214. # 更新 bids
  215. if 'bids' in data_part and isinstance(data_part['bids'], list):
  216. for bid_update in data_part['bids']:
  217. if isinstance(bid_update, dict) and 'p' in bid_update and 'v' in bid_update:
  218. with depth_lock:
  219. update_level(depth['bids'], bid_update['p'], bid_update['v'], is_bids=True)
  220. # 如果更新的是買盤數據,則執行數據更新流程
  221. update_data_for_plotly_and_table()
  222. else:
  223. print(f"[DEPTH_UPDATE] Invalid bid update format: {bid_update}")
  224. # 更新 asks,該策略的賣盤數據不處理
  225. # if 'asks' in data_part and isinstance(data_part['asks'], list):
  226. # for ask_update in data_part['asks']:
  227. # if isinstance(ask_update, dict) and 'p' in ask_update and 'v' in ask_update:
  228. # with depth_lock:
  229. # update_level(depth['asks'], ask_update['p'], ask_update['v'], is_bids=False)
  230. # else:
  231. # print(f"[DEPTH_UPDATE] Invalid ask update format: {ask_update}")
  232. # 可选:限制深度列表的长度,例如只保留前N档
  233. # with depth_lock:
  234. # depth['bids'] = depth['bids'][:50]
  235. # depth['asks'] = depth['asks'][:50]
  236. # # 打印更新后的深度信息 (用于调试)
  237. # delay = time.time() * 1000 - message_data['t']
  238. # print_current_depth(delay)
  239. def print_current_depth(delay, max_levels_to_print=5):
  240. """打印当前深度信息 (前几档)"""
  241. with depth_lock:
  242. print(f"--- Current Depth (lastUpdateId: {depth.get('lastUpdateId')}, Delay={delay}) ---")
  243. print("\nAsks (Price, Volume):")
  244. asks_to_print = depth['asks'][:max_levels_to_print]
  245. for i in range(len(asks_to_print) -1, -1, -1):
  246. price, volume = asks_to_print[i]
  247. print(f" S{i+1}: {price} - {volume}") # S{原始档位号}
  248. if not depth['asks']:
  249. print(" (No asks)")
  250. print("--------------------------\n")
  251. print("Bids (Price, Volume):")
  252. for i, (price, volume) in enumerate(depth['bids'][:max_levels_to_print]):
  253. print(f" B{i+1}. {price} - {volume}")
  254. if not depth['bids']:
  255. print(" (No bids)")
  256. def get_depth(symol, limit=5000):
  257. url = "https://api.mexc.com/api/v3/depth"
  258. params = {'symbol': symol.replace('_', ''), 'limit': limit}
  259. try:
  260. r = requests.get(url, params=params, proxies=proxies, timeout=5) # 减少超时
  261. r.raise_for_status()
  262. return r.json()
  263. except requests.exceptions.RequestException as e:
  264. # logger.error(f"MEXC现货({pair_symbol})请求错误: {e}")
  265. return {"error": f"MEXC现货({pair_symbol})请求错误: {e}"}
  266. except Exception as e:
  267. # logger.error(f"MEXC现货({pair_symbol})处理错误: {e}", exc_info=True)
  268. return {"error": f"MEXC现货({pair_symbol})处理错误: {e}"}
  269. # --- 链上价格获取函数 (Okx) ---
  270. # 返回: price_base_per_target (例如 USDT per RATO)
  271. def get_chain_price_vs_target_currency():
  272. try:
  273. with chain_data_lock:
  274. data = copy.deepcopy(chain_data)
  275. if data.get('code') == '0' and data.get('data'):
  276. d = data['data'][0]
  277. router_result = d['routerResult']
  278. in_dec, out_dec = int(router_result['fromToken']['decimal']), int(router_result['toToken']['decimal'])
  279. atomic_in_base, atomic_out_target = Decimal(router_result['fromTokenAmount']), Decimal(router_result['toTokenAmount'])
  280. human_in_base = atomic_in_base / (10 ** in_dec)
  281. human_out_target = atomic_out_target / (10 ** out_dec)
  282. if human_out_target == 0: return {"error": f"OO输出目标代币为0 ({CHAIN_ID})"}, data # data 也返回
  283. return {"price_base_per_target": human_in_base / human_out_target}, data
  284. else:
  285. logger.info(f"chain數據獲取問題: {data}")
  286. return {
  287. "error": f"Okx API错误 - Code:{data.get('code', 'N/A')}, Msg:{data.get('msg', data.get('message', 'N/A')) if isinstance(data, dict) else '格式错误'}"}, None
  288. except Exception as e:
  289. logger.error(f"Okx请求错误详情: ", exc_info=True)
  290. return {"error": f"Okx请求错误: {e}"}, None
  291. # MEXC 现货 (获取 目标代币/USDT 的 bid 价格)
  292. # 返回: price_target_per_usdt_bid1 (例如 RATO per USDT)
  293. def get_mexc_spot_price_target_usdt_bid():
  294. try:
  295. with depth_lock:
  296. depth_copy = copy.deepcopy(depth)
  297. if 'bids' in depth_copy and depth_copy['bids']: # 确保bids存在且不为空
  298. bids = depth_copy['bids']
  299. trade_volume_remaining = EXCHANGE_OUT_AMOUNT # 还需要卖出的数量 (Decimal)
  300. trade_value = Decimal('0') # 累计的总价值 (Decimal)
  301. accumulated_volume = Decimal('0') # 累计吃单量
  302. for orderbook in bids:
  303. price = Decimal(orderbook[0])
  304. volume = Decimal(orderbook[1])
  305. if trade_volume_remaining <= Decimal('0'):
  306. break # 已经满足卖出量
  307. can_fill = min(volume, trade_volume_remaining)
  308. trade_value += price * can_fill
  309. accumulated_volume += can_fill
  310. trade_volume_remaining -= can_fill
  311. if accumulated_volume == Decimal('0'): # 如果一点都没卖出去
  312. # logger.warning(f"MEXC无法以EXCHANGE_OUT_AMOUNT={EXCHANGE_OUT_AMOUNT}获取任何 efectiva 卖出价格,累积量为0")
  313. return {"error": f"MEXC订单簿深度不足以卖出{EXCHANGE_OUT_AMOUNT} {TARGET_ASSET_SYMBOL}"}, Decimal('0')
  314. # 计算平均卖出价格
  315. # sell_price 代表 1 TARGET_ASSET = X USDT
  316. sell_price = trade_value / accumulated_volume
  317. sell_price = sell_price.quantize(Decimal('1e-15'), rounding=ROUND_DOWN)
  318. # trade_value 代表卖出 accumulated_volume 个 TARGET_ASSET 能得到的 USDT 总量
  319. return {
  320. "price_target_per_usdt_bid1": sell_price # 这个名字其实是 RATO/USDT,所以可以叫 price_target_per_base
  321. }, trade_value # 返回的是实际能卖出 EXCHANGE_OUT_AMOUNT (或更少,如果深度不足) 所得的 USDT 总额
  322. else:
  323. # logger.warning(f"MEXC现货({pair_symbol}) bids 数据不存在或为空: {data}")
  324. return {"error": f"MEXC现货({pair_symbol}) bids 数据不存在或为空"}, Decimal('0')
  325. except requests.exceptions.RequestException as e:
  326. # logger.error(f"MEXC现货({pair_symbol})请求错误: {e}")
  327. return {"error": f"MEXC现货({pair_symbol})请求错误: {e}"}, Decimal('0')
  328. except Exception as e:
  329. # logger.error(f"MEXC现货({pair_symbol})处理错误: {e}", exc_info=True)
  330. return {"error": f"MEXC现货({pair_symbol})处理错误: {e}"}, Decimal('0')
  331. def calculate_percentage_diff(price_a_base_per_target, price_b_base_per_target):
  332. # price_a: MEXC卖价 (USDT/TARGET) - 链上买的目标币,拿到CEX卖掉
  333. # price_b: 链上买价 (USDT/TARGET) - 链上用USDT买目标币
  334. # 期望 price_a > price_b
  335. if price_a_base_per_target is not None and price_b_base_per_target is not None and \
  336. isinstance(price_a_base_per_target, Decimal) and \
  337. isinstance(price_b_base_per_target, Decimal) and price_b_base_per_target != 0:
  338. # (卖价 - 买价) / 买价
  339. rst = (price_a_base_per_target - price_b_base_per_target) / price_b_base_per_target
  340. rst = rst.quantize(Decimal('1e-6'), rounding=ROUND_DOWN) # 提高精度
  341. return rst
  342. return None
  343. def send_arb_msg(profit_amount, chain_swap_data, mexc_price_usdt_per_target, in_amount_copy):
  344. # chain_swap_data 是从 get_chain_price_vs_target_currency 返回的第二个值
  345. if not (chain_swap_data and chain_swap_data.get('data') and chain_swap_data['data']):
  346. logger.error(f"套利消息发送失败:链上交易数据不完整 {chain_swap_data}")
  347. return
  348. d = chain_swap_data['data'][0]
  349. tx = d['tx'] # 这是预签名的交易结构体,不是tx hash
  350. router_result = d['routerResult']
  351. from_token_info = router_result['fromToken']
  352. to_token_info = router_result['toToken']
  353. in_dec, out_dec = int(from_token_info['decimal']), int(to_token_info['decimal'])
  354. # human_in_base 根据实际传入的 IN_AMOUNT_TO_QUERY (trade_value) 确定
  355. # human_out_target 是链上swap的实际输出
  356. atomic_out_target = Decimal(router_result['toTokenAmount'])
  357. human_out_target = atomic_out_target / (10 ** out_dec)
  358. arbitrage_data = {
  359. "tx": tx, # 预签名交易
  360. "profit": str(profit_amount.quantize(Decimal('0.001'))),
  361. "profitLimit": str(PROFIT_LIMIT.quantize(Decimal('0.001'))),
  362. # "mexcPriceUsdtPerTarget": str(mexc_price_usdt_per_target.quantize(Decimal('1e-8'))),
  363. "symbol": MEXC_TARGET_PAIR_USDT,
  364. "fromToken": IN_TOKEN_ADDRESS,
  365. "fromTokenAmountHuman": str(in_amount_copy.quantize(Decimal(f'1e-{in_dec}'))),
  366. "fromTokenDecimal": str(in_dec),
  367. "toToken": OUT_TOKEN_ADDRESS,
  368. "toTokenAmountHuman": str(human_out_target.quantize(Decimal(f'1e-{out_dec}'))),
  369. "toTokenDecimal": str(out_dec),
  370. "exchangeOutAmount": str(EXCHANGE_OUT_AMOUNT.quantize(Decimal(f'1e-{out_dec}'))), # CEX上期望卖出的目标币数量
  371. "strategy": "erc20_to_mexc",
  372. }
  373. logger.info(f"正在提交套利数据到 {ARB_EXECUTOR_URL}, profit {arbitrage_data["profit"]}, profitLimit {arbitrage_data["profitLimit"]}, keys: {arbitrage_data.keys()}")
  374. try:
  375. response = requests.post(ARB_EXECUTOR_URL, json=arbitrage_data, timeout=10)
  376. logger.info(f"套利执行器响应状态码: {response.status_code}")
  377. try:
  378. response_data = response.json()
  379. logger.info(f"套利执行器响应内容: {response_data}")
  380. except requests.exceptions.JSONDecodeError:
  381. logger.error(f"套利执行器响应无法解析为JSON: {response.text}")
  382. except requests.exceptions.RequestException as e:
  383. logger.error(f"连接套利执行器 {ARB_EXECUTOR_URL} 失败: {e}")
  384. except Exception as e:
  385. logger.error(f"发送套利消息未知错误: {e}", exc_info=True)
  386. def update_data_for_plotly_and_table():
  387. global historical_data_points, latest_values_for_table # IN_AMOUNT_TO_QUERY
  388. fetch_time_full = time.strftime("%Y-%m-%d %H:%M:%S")
  389. fetch_time_chart = time.strftime("%H:%M:%S")
  390. # 1. MEXC: 获取 price_target_per_usdt_bid1 (例如 RATO/USDT) 和相应的 trade_value_usdt
  391. # trade_value_usdt 是指如果以 EXCHANGE_OUT_AMOUNT 的目标代币在MEXC上砸盘卖出,能获得的USDT估值
  392. mexc_data, trade_value_usdt = get_mexc_spot_price_target_usdt_bid()
  393. mexc_price_target_per_usdt_bid = mexc_data.get("price_target_per_usdt_bid1") # TARGET/USDT
  394. mexc_err = mexc_data.get("error")
  395. # price_target_per_usdt_bid1: 这是1个目标币能卖多少USDT, 即 USDT/TARGET
  396. # 所以可以直接用,不需要转换,变量名应为 mexc_price_target_per_base
  397. mexc_price_target_per_base = None
  398. if mexc_price_target_per_usdt_bid is not None and mexc_price_target_per_usdt_bid > 0:
  399. mexc_price_target_per_base = mexc_price_target_per_usdt_bid # RATO/USDT => USDT/TARGET (命名约定)
  400. elif not mexc_err and mexc_price_target_per_usdt_bid is not None:
  401. mexc_err = mexc_err or "MEXC价格为0或无效"
  402. if mexc_err or trade_value_usdt == Decimal('0'): # 如果MEXC有问题或无法确定砸盘价值,则跳过本次循环
  403. logger.warning(f"MEXC数据获取问题: {mexc_err}, trade_value_usdt: {trade_value_usdt}. 跳过本次循环。")
  404. with data_lock: # 依然更新错误信息
  405. latest_values_for_table["mexc_error"] = mexc_err
  406. latest_values_for_table["oo_error"] = latest_values_for_table.get("oo_error") # 保持上次的oo_error
  407. latest_values_for_table["last_updated"] = fetch_time_full
  408. time.sleep(REFRESH_INTERVAL_SECONDS)
  409. return
  410. # 2. 确定链上查询的输入金额 (USDT)
  411. # 使用 MEXC 卖出 EXCHANGE_OUT_AMOUNT 个目标币能得到的USDT数量 (trade_value_usdt)
  412. # 作为链上购买目标币时花费的USDT数量 (in_amount_to_query_human)
  413. # 交易所賣了多少
  414. global in_amount_to_query_human
  415. with in_amount_lock:
  416. in_amount_to_query_human = trade_value_usdt.quantize(Decimal('1e-2'), rounding=ROUND_DOWN) # 保留两位小数
  417. in_amount_copy = Decimal(str(in_amount_to_query_human))
  418. if in_amount_copy <= Decimal('0'):
  419. logger.warning(f"计算出的链上查询金额为0或负数 ({in_amount_copy} USDT),跳过。trade_value_usdt: {trade_value_usdt}")
  420. time.sleep(REFRESH_INTERVAL_SECONDS)
  421. return
  422. # 3. 获取链上价格:用 in_amount_copy 这么多的USDT去买目标币,能买到多少,以及价格 (USDT/TARGET)
  423. oo_data, chain_swap_full_response = get_chain_price_vs_target_currency()
  424. oo_price_usdt_per_target = oo_data.get("price_base_per_target") # USDT/TARGET
  425. oo_err = oo_data.get("error")
  426. # 4. 计算百分比差异
  427. # diff = (MEXC卖价 - 链上买价) / 链上买价
  428. diff_oo_vs_mexc_bid1_pct = calculate_percentage_diff(
  429. mexc_price_target_per_base, # MEXC卖价 (USDT/TARGET)
  430. oo_price_usdt_per_target # 链上买价 (USDT/TARGET)
  431. )
  432. # 5. 计算实际利润额 (以USDT计价)
  433. # 利润 = (MEXC每目标币卖价 - 链上每目标币买价) * 链上买入的目标币数量
  434. # 链上买入的目标币数量 = in_amount_copy / oo_price_usdt_per_target
  435. # 简化:利润百分比 * 投入的USDT金额
  436. actual_profit_usdt = None
  437. if diff_oo_vs_mexc_bid1_pct is not None and oo_price_usdt_per_target is not None and oo_price_usdt_per_target > 0:
  438. # 方案A: 基于百分比和投入金额
  439. actual_profit_usdt = diff_oo_vs_mexc_bid1_pct * in_amount_copy
  440. # 6. 满足利润条件,发送套利消息, PROFIT_LIMIT + 3的3是提前計算的成本,否則一直提交
  441. global mode
  442. if actual_profit_usdt is not None and actual_profit_usdt > PROFIT_LIMIT + 3 and mode == 'trade':
  443. if chain_swap_full_response: # 确保有完整的链上数据
  444. send_arb_msg(actual_profit_usdt, chain_swap_full_response, mexc_price_target_per_base, in_amount_copy)
  445. else:
  446. logger.warning("利润满足但链上数据不完整,无法发送套利消息。")
  447. # 數據一秒更新一次
  448. global last_insert_time
  449. with last_insert_time_lock:
  450. if time.time() - 1 > last_insert_time:
  451. last_insert_time = time.time()
  452. current_point = {
  453. "time": fetch_time_chart,
  454. "oo_price_usdt_per_target": float(oo_price_usdt_per_target) if oo_price_usdt_per_target else None,
  455. "mexc_price_target_per_base": float(mexc_price_target_per_base) if mexc_price_target_per_base else None,
  456. "diff_oo_vs_mexc_bid1": float(diff_oo_vs_mexc_bid1_pct) if diff_oo_vs_mexc_bid1_pct is not None else None,
  457. "profit_value": float(actual_profit_usdt) if actual_profit_usdt is not None else None, # 新增:用于图表的实际利润额
  458. }
  459. with data_lock:
  460. historical_data_points.append(current_point)
  461. latest_values_for_table["oo_price_usdt_per_target"] = f"{oo_price_usdt_per_target:.8f}" if oo_price_usdt_per_target else "N/A"
  462. latest_values_for_table["mexc_price_target_per_base"] = f"{mexc_price_target_per_base:.8f}" if mexc_price_target_per_base else "N/A"
  463. latest_values_for_table["diff_oo_vs_mexc_bid1_percentage"] = f"{diff_oo_vs_mexc_bid1_pct:+.4%}" if diff_oo_vs_mexc_bid1_pct is not None else "N/A" # 显示为百分比
  464. latest_values_for_table["profit_value_for_table"] = f"{actual_profit_usdt:.2f} {BASE_CURRENCY_SYMBOL}" if actual_profit_usdt is not None else "N/A" # 新增
  465. latest_values_for_table["oo_error"] = oo_err
  466. latest_values_for_table["mexc_error"] = mexc_err
  467. latest_values_for_table["last_updated"] = fetch_time_full
  468. latest_values_for_table["in_amount_for_query_display"] = f"{in_amount_copy:.2f} {BASE_CURRENCY_SYMBOL}" if in_amount_copy > 0 else "N/A"
  469. # if oo_err or mexc_err :
  470. # logger.warning(f"{fetch_time_chart} Errors: OO:{oo_err}, MEXC:{mexc_err}")
  471. @app.route('/')
  472. def index_plotly():
  473. return render_template('index_plotly_dynamic_ok.html',
  474. target_asset=TARGET_ASSET_SYMBOL,
  475. base_asset=BASE_CURRENCY_SYMBOL,
  476. mexc_pair_usdt=MEXC_TARGET_PAIR_USDT,
  477. refresh_interval_ms=REFRESH_INTERVAL_SECONDS * 1000)
  478. @app.route('/table-data')
  479. def get_table_data():
  480. with data_lock:
  481. # logger.info(f"Table data requested: {latest_values_for_table}")
  482. return jsonify(latest_values_for_table)
  483. @app.route('/plotly-chart-data')
  484. def get_plotly_chart_data():
  485. with data_lock:
  486. points = list(historical_data_points)
  487. # logger.info(f"Chart data requested, {len(points)} points.")
  488. if not points:
  489. fig = go.Figure() # Create an empty figure
  490. fig.update_layout(title_text="暂无数据")
  491. empty_json = json.loads(json.dumps(fig, cls=PlotlyJSONEncoder))
  492. return jsonify({
  493. "price_chart": empty_json,
  494. "diff_chart": empty_json,
  495. "profit_chart": empty_json # 新增:空利润图表
  496. })
  497. times = [p['time'] for p in points]
  498. display_target_asset = latest_values_for_table["target_asset_symbol_for_display"]
  499. display_base_asset = latest_values_for_table["base_currency_symbol_for_display"]
  500. common_xaxis_config = dict(title='时间')
  501. common_legend_config = dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
  502. # if len(times) > 1: # Plotly handles autorange for single point ok
  503. # common_xaxis_config['range'] = [times[0], times[-1]]
  504. # else:
  505. common_xaxis_config['autorange'] = True
  506. # Price Chart
  507. fig_prices = go.Figure()
  508. fig_prices.add_trace(go.Scatter(x=times, y=[p['oo_price_usdt_per_target'] for p in points], mode='lines',
  509. name=f'Okx ({display_base_asset}/{display_target_asset})',
  510. line=dict(color='rgb(75, 192, 192)'),
  511. hovertemplate=f'<b>Okx链上价</b><br>价格: %{{y:.8f}} {display_base_asset}<extra></extra>',
  512. connectgaps=True)) # 处理None值不画线
  513. fig_prices.add_trace(go.Scatter(x=times, y=[p['mexc_price_target_per_base'] for p in points], mode='lines',
  514. name=f'MEXC卖1价 ({display_base_asset}/{display_target_asset})',
  515. line=dict(color='rgb(255, 99, 132)', dash='dash'),
  516. hovertemplate=f'<b>MEXC卖出价</b><br>价格: %{{y:.8f}} {display_base_asset}<extra></extra>',
  517. connectgaps=True))
  518. fig_prices.update_layout(title_text=f'{display_base_asset}/{display_target_asset} 价格历史',
  519. xaxis=common_xaxis_config.copy(),
  520. yaxis_title=f'价格 (1 {display_target_asset} = X {display_base_asset})',
  521. legend_title_text='平台',
  522. legend=common_legend_config.copy(), hovermode='x unified',
  523. margin=dict(l=70, r=30, t=80, b=50))
  524. # Percentage Difference Chart
  525. fig_diffs = go.Figure()
  526. fig_diffs.add_trace(
  527. go.Scatter(x=times, y=[p['diff_oo_vs_mexc_bid1'] for p in points], mode='lines', name=f'价差百分比 (MEXC卖价 vs Okx买价)',
  528. line=dict(color='rgb(255, 159, 64)'),
  529. hovertemplate=f'<b>(MEXC卖价-Okx买价)/Okx买价</b><br>百分比: %{{y:+.4%}}<extra></extra>', # 显示为百分比
  530. connectgaps=True))
  531. fig_diffs.update_layout(title_text=f'价差百分比历史曲线',
  532. xaxis=common_xaxis_config.copy(),
  533. yaxis_title='价差百分比', legend_title_text='对比', legend=common_legend_config.copy(),
  534. yaxis_zeroline=True, hovermode='x unified', margin=dict(l=70, r=30, t=80, b=50),
  535. yaxis_tickformat=".4%") # y轴也显示为百分比
  536. # --- 新增 Profit Chart ---
  537. fig_profit = go.Figure()
  538. fig_profit.add_trace(
  539. go.Scatter(x=times, y=[p['profit_value'] for p in points], mode='lines', name=f'预估利润 ({display_base_asset})',
  540. line=dict(color='rgb(153, 102, 255)'), # 紫色
  541. hovertemplate=f'<b>预估利润</b><br>金额: %{{y:,.2f}} {display_base_asset}<extra></extra>', # 利润金额,保留2位小数
  542. connectgaps=True))
  543. fig_profit.update_layout(title_text=f'预估利润历史 ({display_base_asset})',
  544. xaxis=common_xaxis_config.copy(),
  545. yaxis_title=f'利润 ({display_base_asset})',
  546. legend_title_text='利润额',
  547. legend=common_legend_config.copy(),
  548. yaxis_zeroline=True, hovermode='x unified',
  549. margin=dict(l=70, r=30, t=80, b=50),
  550. yaxis_tickformat="$,.2f") # y轴格式化为货币
  551. combined_figure_data = {
  552. "price_chart": json.loads(json.dumps(fig_prices, cls=PlotlyJSONEncoder)),
  553. "diff_chart": json.loads(json.dumps(fig_diffs, cls=PlotlyJSONEncoder)),
  554. "profit_chart": json.loads(json.dumps(fig_profit, cls=PlotlyJSONEncoder)) # 新增
  555. }
  556. return jsonify(combined_figure_data)
  557. def handle_ctrl_c(sig, frame):
  558. print("\nCtrl+C received. Stopping all clients...")
  559. for client_instance in active_clients:
  560. client_instance.stop()
  561. time.sleep(2)
  562. if __name__ == "__main__":
  563. signal.signal(signal.SIGINT, handle_ctrl_c)
  564. signal.signal(signal.SIGTERM, handle_ctrl_c)
  565. logger.info("Application starting. Press Ctrl+C to stop.")
  566. # 加载一次全量深度
  567. with depth_lock:
  568. depth = get_depth(MEXC_TARGET_PAIR_USDT)
  569. # logger.info(f"depth:\n{pformat(depth, indent=2)}")
  570. logger.info("启动depth数据更新线程...")
  571. updater_thread = threading.Thread(target=update_depth_data_periodically, daemon=True)
  572. updater_thread.start()
  573. logger.info("启动chain数据更新线程...")
  574. updater_thread = threading.Thread(target=update_chain_data_periodically, daemon=True)
  575. updater_thread.start()
  576. # 创建并启动 WebSocket 客户端,订阅增量深度
  577. # 注意:选择一个有交易量的币对,例如 BTCUSDT
  578. SYMBOL_TO_MONITOR = MEXC_TARGET_PAIR_USDT.replace('_', '')
  579. # 确保订阅的是增量深度频道
  580. depth_subscriptions = [f"spot@public.increase.depth.v3.api@{SYMBOL_TO_MONITOR}"]
  581. depth_client = MexcWSClient(
  582. symbol=SYMBOL_TO_MONITOR,
  583. on_message_callback=update_depth_from_incremental, # 所有消息都给这个处理器
  584. subscriptions=depth_subscriptions
  585. )
  586. depth_client.start()
  587. parser = argparse.ArgumentParser(description='套利监控和交易脚本。')
  588. parser.add_argument('--mode',
  589. required=True,
  590. choices=['trade', 'view'], # 限制可选值
  591. help='运行模式: "trade" (执行交易) 或 "view" (仅观察)')
  592. try:
  593. args = parser.parse_args()
  594. mode = args.mode
  595. logger.info(f"脚本运行模式为: {mode}")
  596. logger.info("应用启动...")
  597. logger.info(f"目标资产: {TARGET_ASSET_SYMBOL}, 计价货币: {BASE_CURRENCY_SYMBOL}")
  598. # IN_AMOUNT_TO_QUERY 会动态变化,初始值从配置读取,但循环中会基于MEXC的trade_value更新
  599. # logger.info(f"链上查询初始金额: {arb['IN_AMOUNT_TO_QUERY']} {BASE_CURRENCY_SYMBOL} -> {TARGET_ASSET_SYMBOL}")
  600. logger.info(f"MEXC期望卖出量 (用于计算深度和价值): {EXCHANGE_OUT_AMOUNT} {TARGET_ASSET_SYMBOL}")
  601. logger.info(f"利润阈值: {PROFIT_LIMIT} {BASE_CURRENCY_SYMBOL}")
  602. logger.info(f"MEXC现货交易对: {MEXC_TARGET_PAIR_USDT}")
  603. # data_thread = threading.Thread(target=update_data_for_plotly_and_table, daemon=True)
  604. # data_thread.start()
  605. port = arb.get("PORT", 5001) # 从配置获取端口,如果没有则默认5001
  606. logger.info(f"Flask 服务将在 http://0.0.0.0:{port} 上运行 (刷新间隔: {REFRESH_INTERVAL_SECONDS}s)")
  607. app.run(debug=False, host='0.0.0.0', port=port, use_reloader=False)
  608. except SystemExit: # argparse 在参数错误时会引发 SystemExit
  609. # parser.print_help() # argparse 默认会打印帮助信息
  610. logger.info("脚本因参数错误而退出。请提供 '--mode' 参数 ('trade' 或 'view')。")
  611. except Exception as e:
  612. logger.critical(f"主程序发生严重错误: {e}", exc_info=True)