s_erc20_to_mexc.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. import time
  2. import traceback
  3. import copy
  4. import os
  5. import requests
  6. import json
  7. from mexc_client import MexcClient
  8. from decimal import Decimal, ROUND_DOWN
  9. from as_utils import add_state_flow_entry
  10. from decimal_utils import decimal_to_string_no_scientific
  11. from checker.logger_config import get_logger
  12. from pprint import pformat
  13. from pprint import pprint
  14. mexc = MexcClient()
  15. # 配置日志
  16. logger = get_logger('as')
  17. class ArbitrageProcess:
  18. def __init__(self, process_item,
  19. core_data, core_lock,
  20. mexc_data, mexc_lock,
  21. ):
  22. """
  23. 初始化套利流程
  24. Args:
  25. process_item: 信號發送端傳入的原始參數
  26. """
  27. '''
  28. process_item
  29. {
  30. 'cexPrice': '0.0104700000',
  31. 'dexPrice': '0.01030220134289945035466016829',
  32. 'pct': '0',
  33. 'closeLimit': '-1.000',
  34. 'openLimit': '0.000',
  35. 'exchangeOutAmount': '3000.000000000000000000',
  36. 'queryPriceUrl': '127.0.0.1:7777/table-data',
  37. 'strategy': 'erc20_to_mexc',
  38. 'symbol': 'APETH_USDT'
  39. }
  40. '''
  41. self.core_data = core_data
  42. self.core_lock = core_lock
  43. self.mexc_data = mexc_data
  44. self.mexc_lock = mexc_lock
  45. # symbol轉大寫
  46. self.symbol = process_item['symbol'].upper()
  47. self.coin = self.symbol.split('_')[0]
  48. self.base_coin = self.symbol.split('_')[1]
  49. # 其它参数
  50. self.cex_price = Decimal(process_item['cexPrice'])
  51. self.dex_price = Decimal(process_item['dexPrice'])
  52. self.pct = Decimal(process_item['pct'])
  53. self.open_limit = Decimal(process_item['openLimit'])
  54. self.close_limit = Decimal(process_item['closeLimit'])
  55. self.query_price_url = process_item['queryPriceUrl']
  56. # 留档
  57. self.process_item = process_item
  58. # 存储当前套利交易的细节信息,例如买入数量、价格等
  59. self.sell_amount = Decimal(process_item['exchangeOutAmount']) # 交易所卖出量
  60. self.sell_price = Decimal(0) # 实际卖出价格
  61. self.sell_value = Decimal(0) # 实际卖出价值
  62. self.buy_price = Decimal(0)
  63. self.buy_amount = Decimal(0)
  64. self.buy_value = Decimal(0)
  65. self.actual_profit = Decimal(0) # 實際利潤
  66. # 定义可能的状态
  67. self.STATE_IDLE = "IDLE"
  68. self.STATE_CHECK = "CHECK" # 检查余额、估算gas等
  69. self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE" # 正在中心化交易所卖出现货
  70. self.STATE_WAITING_PCT_CONVER = "WAITING_PCT_CONVER" # 等待价差回归
  71. self.STATE_BUYING_ON_EXCHANGE = "BUYING_ON_EXCHANGE" # 正在中心化交易所买回现货
  72. self.STATE_COMPLETED = "COMPLETED" # 套利流程完成
  73. self.STATE_REJECT = "REJECT" # 套利被程序拒绝
  74. self.STATE_FAILED = "FAILED" # 套利流程失败
  75. self.STATES = [
  76. self.STATE_IDLE,
  77. self.STATE_CHECK,
  78. self.STATE_SELLING_ON_EXCHANGE,
  79. self.STATE_WAITING_PCT_CONVER,
  80. self.STATE_BUYING_ON_EXCHANGE,
  81. self.STATE_COMPLETED,
  82. self.STATE_REJECT,
  83. self.STATE_FAILED,
  84. ]
  85. # 所有前置信息获取都没有问题的话就等待开机信号
  86. self.current_state = self.STATE_IDLE
  87. # --------------------------------------- 获取交易规则 ---------------------------------------
  88. exchange_info_params = {
  89. "symbols": self.symbol.replace('_', '')
  90. }
  91. exchange_info_rst = mexc.market.get_exchangeInfo(exchange_info_params)
  92. # 返回值检查
  93. if 'symbols' not in exchange_info_rst or len(exchange_info_rst['symbols']) != 1:
  94. params_formated = pformat(exchange_info_params, indent=2)
  95. info_formated = pformat(exchange_info_rst, indent=2)
  96. msg = f'获取交易规则时出现错误\n{exchange_info_params}\n{info_formated}'
  97. logger.error(msg)
  98. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  99. self.current_state = self.STATE_FAILED
  100. return
  101. # 返回的交易对信息核对]
  102. exchange_info = exchange_info_rst['symbols'][0]
  103. if exchange_info['symbol'].upper() != self.symbol.replace('_', ''):
  104. info_formated = pformat(exchange_info, indent=2)
  105. msg = f'获取到的交易规则与交易币对无关\n{info_formated}'
  106. logger.error(msg)
  107. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  108. self.current_state = self.STATE_FAILED
  109. return
  110. # 精度取得, 假如是RATOUSDT这个交易对的话
  111. self.coin_asset_precision = Decimal(f'1e-{exchange_info['baseAssetPrecision']}') # 这是RATO的精度
  112. self.base_coin_asset_precision = Decimal(f'1e-{exchange_info['quoteAssetPrecision']}') # 这是USDT的精度
  113. self.price_precision = Decimal(f'1e-{exchange_info['quotePrecision']}') # 这是价格的精度
  114. # 格式化价格
  115. self.cex_price = self.cex_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  116. self.dex_price = self.dex_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  117. # 数量
  118. self.sell_amount = self.sell_amount.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  119. def _set_state(self, state):
  120. """
  121. 设置系统状态,并打印日志
  122. """
  123. if state in self.STATES:
  124. logger.info(f"状态变更 {self.current_state} -> {state}")
  125. logger.info('')
  126. self.current_state = state
  127. else:
  128. logger.error(f"尝试设置无效状态 {state}")
  129. def run_arbitrage_step(self):
  130. """
  131. 根据当前状态执行套利流程的下一步
  132. 这是一个周期性调用的函数,例如在主循环中调用
  133. """
  134. if self.current_state == self.STATE_CHECK:
  135. self._execute_check()
  136. elif self.current_state == self.STATE_SELLING_ON_EXCHANGE:
  137. self._execute_sell_on_exchange()
  138. elif self.current_state == self.STATE_WAITING_PCT_CONVER:
  139. self._execute_wait_pct_cover()
  140. elif self.current_state == self.STATE_BUYING_ON_EXCHANGE:
  141. self._execute_buy_on_exchange()
  142. elif self.current_state == self.STATE_COMPLETED:
  143. msg = "套利流程成功完成!"
  144. logger.info(msg)
  145. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  146. elif self.current_state == self.STATE_REJECT:
  147. msg = "套利流程被程序拒绝"
  148. logger.error(msg)
  149. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  150. elif self.current_state == self.STATE_FAILED:
  151. msg = "套利流程失败!"
  152. logger.error(msg)
  153. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  154. def get_local_data_no_params(self, url):
  155. """
  156. 请求本地接口, 不携带参数, 并将返回值解析为 JSON。
  157. Args:
  158. url (str): 本地接口的 URL。
  159. Returns:
  160. dict or None: 如果请求成功且返回是有效的 JSON, 则返回 JSON 数据(Python 字典)。
  161. 否则, 返回 None。
  162. """
  163. try:
  164. # 发送 GET 请求到指定的 URL, 不携带参数
  165. response = requests.get(url)
  166. # 检查 HTTP 状态码, 200 表示成功
  167. response.raise_for_status()
  168. # 尝试将响应内容解析为 JSON
  169. # requests 库提供了一个方便的方法 .json() 来自动处理 JSON 解析和编码问题
  170. data = response.json()
  171. return data
  172. except requests.exceptions.HTTPError as err_http:
  173. logger.error(f"HTTP 错误发生: {err_http}") # 例如 404 Not Found, 500 Internal Server Error
  174. return None
  175. except requests.exceptions.ConnectionError as err_conn:
  176. logger.error(f"连接错误发生: {err_conn}") # 例如本地接口未运行
  177. return None
  178. except requests.exceptions.Timeout as err_timeout:
  179. logger.error(f"请求超时: {err_timeout}")
  180. return None
  181. except requests.exceptions.RequestException as err:
  182. logger.error(f"发生未知错误: {err}")
  183. return None
  184. except json.JSONDecodeError as err_json:
  185. logger.error(f"无法解析 JSON: {err_json}")
  186. logger.error(f"响应内容可能不是有效的 JSON: \n{response.text}")
  187. return None
  188. def _execute_check(self):
  189. """
  190. 前置检查,防止低能错误
  191. """
  192. try:
  193. # step1,檢查交易所的餘額是否夠用
  194. # 处理精度
  195. pseudo_amount_to_sell = self.sell_amount
  196. msg = f"套利开始, dex {self.dex_price}, cex {self.cex_price}, pct {self.pct}"
  197. logger.info(msg)
  198. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  199. # 交易所套保余额判断
  200. with self.mexc_lock:
  201. balances = self.mexc_data['account_info']['balances']
  202. for balance in balances:
  203. if balance['asset'] == self.coin:
  204. if Decimal(balance['free']) < pseudo_amount_to_sell:
  205. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出 {pseudo_amount_to_sell}, 不能触发交易。"
  206. logger.info(msg)
  207. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  208. self._set_state(self.STATE_REJECT)
  209. return
  210. else:
  211. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出 {pseudo_amount_to_sell}, 余额校验通过(可以交易)。"
  212. logger.info(msg)
  213. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  214. break
  215. # final, 設定交易狀態,開始交易
  216. self._set_state(self.STATE_SELLING_ON_EXCHANGE)
  217. except Exception as e:
  218. exc_traceback = traceback.format_exc()
  219. msg = f"前置檢查未通過\n{exc_traceback}"
  220. logger.error(msg)
  221. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  222. self._set_state(self.STATE_REJECT)
  223. # traceback.logger.error_exc()
  224. # 执行卖出, 使用超价单
  225. def _execute_sell_on_exchange(self):
  226. """
  227. 在中心化交易所卖出现货
  228. """
  229. msg = "执行 中心化交易所卖出现货..."
  230. logger.info(msg)
  231. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  232. try:
  233. self.already_sold_amount = Decimal(0)
  234. # 第一步直接卖出,这个数量用固定数量
  235. pseudo_amount_to_sell = self.sell_amount - self.already_sold_amount
  236. # 处理精度
  237. pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  238. price_for_api = self.dex_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  239. price_for_api = decimal_to_string_no_scientific(price_for_api)
  240. # 初始化 quantity 变量
  241. quantity_for_api = None
  242. # 用求余法判断是否是整数
  243. if pseudo_amount_to_sell % 1 == 0:
  244. # 如果是整数, 转换为 int 类型。某些API可能只接受整数交易对的整数数量
  245. quantity_for_api = int(pseudo_amount_to_sell)
  246. else:
  247. # 如果是非整数, 转换为 float 类型。这是最常见的API数量类型
  248. quantity_for_api = float(pseudo_amount_to_sell)
  249. order_params = {
  250. "symbol": self.symbol.replace('_', ''),
  251. "side": "SELL",
  252. "type": "LIMIT",
  253. "price": price_for_api,
  254. "quantity": quantity_for_api,
  255. }
  256. order_params_formated = pformat(order_params, indent=2)
  257. exchange_sell_order = mexc.trade.post_order(order_params)
  258. exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  259. msg = f"交易所现货卖出订单已发送 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
  260. logger.info(msg)
  261. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  262. if 'orderId' not in exchange_sell_order:
  263. msg = '下单失败, 请检查参数及返回值'
  264. logger.error(msg)
  265. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  266. self._set_state(self.STATE_FAILED)
  267. return
  268. # 查询交易所订单状态
  269. self.exchange_sell_order_id = exchange_sell_order['orderId']
  270. waiting_times = 30
  271. order = None
  272. while waiting_times > 0:
  273. # 最后一次尝试就撤单了不搞了
  274. if waiting_times == 1:
  275. params = {
  276. "symbol": self.symbol.replace('_', ''),
  277. "orderId": self.exchange_sell_order_id
  278. }
  279. delete_order = mexc.trade.delete_order(params)
  280. delete_order_formated = pformat(delete_order, indent=2)
  281. msg = f"【WARNING】交易所现货卖出未完全成交\n order: {delete_order_formated}"
  282. logger.warning(msg)
  283. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  284. time.sleep(5)
  285. # 获取订单状态, 直到完全成交或超时
  286. params = {
  287. "symbol": self.symbol.replace('_', ''),
  288. "orderId": self.exchange_sell_order_id
  289. }
  290. order = mexc.trade.get_order(params)
  291. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  292. # 以实际成交数量为准
  293. money = Decimal(order['cummulativeQuoteQty'])
  294. self.already_sold_amount = self.already_sold_amount + Decimal(order['executedQty'])
  295. self.sell_value = self.sell_value + money
  296. self.sell_price = self.sell_value / self.already_sold_amount
  297. self.sell_price = self.sell_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  298. break
  299. else:
  300. time.sleep(1)
  301. waiting_times = waiting_times - 1
  302. order_formated = pformat(order, indent=2)
  303. msg = f"交易所现货卖出订单已完成, 价格 {self.sell_price}, sell_value: {self.sell_value}\n order: {order_formated}"
  304. logger.info(msg)
  305. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  306. if self.sell_value > 2:
  307. msg = 'mexc现货卖出流程完成'
  308. logger.info(msg)
  309. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  310. self._set_state(self.STATE_BUYING_ON_EXCHANGE)
  311. else:
  312. msg = 'mexc现货卖出流程失败, 成交价值小于2'
  313. logger.info(msg)
  314. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  315. self._set_state(self.STATE_FAILED)
  316. except Exception as e:
  317. exc_traceback = traceback.format_exc()
  318. msg = f"交易所现货卖出下单失败\n{exc_traceback}"
  319. logger.error(msg)
  320. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  321. self._set_state(self.STATE_FAILED)
  322. # traceback.logger.error_exc()、
  323. def _execute_buy_on_exchange(self):
  324. """
  325. 执行回购操作
  326. """
  327. msg = f"正在回购, 目标回购数量 {self.already_sold_amount}, 金额: {self.sell_value}"
  328. logger.info(msg)
  329. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  330. try:
  331. exchange_buy_order = None
  332. order_error_times = 0
  333. last_order_price = Decimal(0)
  334. dex_price = Decimal(0)
  335. self.already_bought_amount = Decimal(0)
  336. while order_error_times < 10:
  337. time.sleep(0.5)
  338. try:
  339. table_data = self.get_local_data_no_params(self.query_price_url)
  340. # 数据合法性
  341. if table_data is None or 'dex_price' not in table_data:
  342. continue
  343. dex_price = Decimal(table_data['dex_price'])
  344. dex_price = dex_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  345. ready_buy_price = dex_price * (Decimal(1) - self.close_limit)
  346. ready_buy_price = ready_buy_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  347. # 准备购入的价值, 如果小于2u就不要提交了
  348. pseudo_value_to_buy = ready_buy_price * (self.already_sold_amount - self.already_bought_amount)
  349. if pseudo_value_to_buy < 2:
  350. break
  351. # 没有订单时的逻辑
  352. if exchange_buy_order is None:
  353. # 交易所U余额判断
  354. with self.mexc_lock:
  355. balances = self.mexc_data['account_info']['balances']
  356. for balance in balances:
  357. if balance['asset'] == self.base_coin:
  358. free_balance = Decimal(balance['free'])
  359. pseudo_value_to_buy = min(free_balance, pseudo_value_to_buy)
  360. if pseudo_value_to_buy < Decimal('2'):
  361. msg = f"交易所剩余{self.base_coin}: {free_balance}, 小于2, 不能触发回购交易。"
  362. logger.info(msg)
  363. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  364. self._set_state(self.STATE_FAILED)
  365. return
  366. else:
  367. msg = f"交易所剩余{self.base_coin}: {free_balance}, 准备使用 {pseudo_value_to_buy}, fp {dex_price}, 挂单价格{ready_buy_price}, 余额校验通过。"
  368. logger.info(msg)
  369. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  370. break
  371. # 实际能购入的数量(可能会亏损导致买不回来, 所以要考虑实际有多少money)
  372. quantity_for_api = pseudo_value_to_buy / ready_buy_price
  373. quantity_for_api = quantity_for_api.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  374. # 用求余法判断是否是整数
  375. if quantity_for_api % 1 == 0:
  376. # 如果是整数, 转换为 int 类型。某些API可能只接受整数交易对的整数数量
  377. quantity_for_api = int(quantity_for_api)
  378. else:
  379. # 如果是非整数, 转换为 float 类型。这是最常见的API数量类型
  380. quantity_for_api = float(quantity_for_api)
  381. price_for_api = decimal_to_string_no_scientific(ready_buy_price)
  382. order_params = {
  383. "symbol": self.symbol.replace('_', ''),
  384. "side": "BUY",
  385. "type": "LIMIT",
  386. "price": price_for_api,
  387. "quantity": quantity_for_api,
  388. }
  389. order_params_formated = pformat(order_params, indent=2)
  390. exchange_buy_order = mexc.trade.post_order(order_params)
  391. exchange_buy_order_formated = pformat(exchange_buy_order, indent=2)
  392. if 'orderId' not in exchange_buy_order:
  393. table_data_formated = pformat(table_data, indent=2)
  394. msg = f"交易所现货买入订单发送失败 \n params:{order_params_formated} \n rst: {exchange_buy_order_formated} \n table: {table_data_formated}"
  395. logger.error(msg)
  396. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  397. exchange_buy_order = None
  398. order_error_times = order_error_times + 1
  399. else:
  400. self.exchange_buy_order_id = exchange_buy_order['orderId']
  401. last_order_price = ready_buy_price
  402. # 有订单时的逻辑
  403. else:
  404. # 获取订单状态, 直到完全成交或超时
  405. params = {
  406. "symbol": self.symbol.replace('_', ''),
  407. "orderId": self.exchange_buy_order_id
  408. }
  409. order = mexc.trade.get_order(params)
  410. # 主要判断成交或取消了的订单
  411. if order['status'] in ["FILLED", "PARTIALLY_CANCELED", "CANCELED"]:
  412. # 以实际成交价值为准
  413. money = Decimal(order['cummulativeQuoteQty'])
  414. # 实际成交价值大于0才计算
  415. if money > Decimal(0):
  416. order_formated = pformat(order, indent=2)
  417. logger.info(f"检测到有成交 \n {order_formated}")
  418. self.already_bought_amount = self.already_bought_amount + Decimal(order['executedQty'])
  419. self.buy_value = self.buy_value + money
  420. self.buy_price = self.buy_value / self.already_bought_amount
  421. self.buy_price = self.buy_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  422. exchange_buy_order = None
  423. # 如果没有成交或取消则判断是否达到取消条件了, 这里面不能置空
  424. elif abs(Decimal(1) - last_order_price / ready_buy_price) > Decimal(0.0005):
  425. params = {
  426. "symbol": self.symbol.replace('_', ''),
  427. "orderId": self.exchange_buy_order_id
  428. }
  429. _deleteed_order = mexc.trade.delete_order(params)
  430. # deleteed_order_formated = pformat(_deleteed_order, indent=2)
  431. # msg = f"【WARNING】价格变化, 重新挂单, order price {last_order_price}, dex_price {dex_price} \n order: {deleteed_order_formated}"
  432. # logger.warning(msg)
  433. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  434. except Exception as e:
  435. exc_traceback = traceback.format_exc()
  436. msg = f"请求价格接口时出现错误\n{exc_traceback}"
  437. logger.error(msg)
  438. # diff 仍然代表未买回的数量, 非常重要, 需要记录
  439. diff = self.already_sold_amount - self.already_bought_amount
  440. unrealized_cost = diff * dex_price # 使用最后一次获取到的市价 dex_price 更合适
  441. # 已实现的利润 = 总卖出额 - 总买入额
  442. realized_profit = (self.sell_value - self.buy_value) - unrealized_cost
  443. realized_profit = realized_profit.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  444. if diff > 0:
  445. # 如果有未买回的部分, 将其与最后一次的市价相乘, 作为 "浮动亏损" 或 "未平仓成本" 单独记录
  446. msg = f"套利流程完成, 但有 {diff} 的数量未回补。已实现利润: {realized_profit}, 未平仓成本估算: {unrealized_cost} (基于价格 {dex_price})"
  447. else:
  448. msg = f"套利流程完成, 全部回补。最终利润: {realized_profit}, 总卖值: {self.sell_value}, 总买值: {self.buy_value}"
  449. self.process_item['profit'] = realized_profit
  450. logger.info(msg)
  451. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  452. self._set_state(self.STATE_COMPLETED)
  453. except Exception as e:
  454. exc_traceback = traceback.format_exc()
  455. msg = f"等待价差回归失败\n{exc_traceback}"
  456. logger.error(msg)
  457. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  458. self._set_state(self.STATE_FAILED)