s_mexc_to_erc20.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. import time
  2. import traceback
  3. import copy
  4. import os
  5. from web3_py_client import EthClient
  6. from mexc_client import MexcClient
  7. from decimal import Decimal, ROUND_DOWN
  8. from as_utils import add_state_flow_entry
  9. from checker.logger_config import get_logger
  10. from pprint import pformat
  11. web3 = EthClient()
  12. mexc = MexcClient()
  13. # 配置日志
  14. logger = get_logger('as')
  15. class ArbitrageProcess:
  16. def __init__(self, gas_limit_multiplier, gas_price_multiplier, process_item,
  17. core_data, core_lock,
  18. pending_data, pending_lock,
  19. mexc_data, mexc_lock,
  20. ):
  21. """
  22. 初始化套利流程
  23. Args:
  24. gas_limit_multiplier: gas limit倍数, 一般都不加倍
  25. gas_price_multiplier: gas price倍数, 可以提高交易成功率
  26. process_item: 信號發送端傳入的原始參數
  27. """
  28. self.NETWORK = 'ETH'
  29. tx = process_item['tx']
  30. tx.pop('gasPrice', None)
  31. tx.pop('value', None)
  32. tx.pop('minReceiveAmount', None)
  33. tx.pop('slippage', None)
  34. tx.pop('maxSpendAmount', None)
  35. tx.pop('signatureData', None)
  36. self.core_data = core_data
  37. self.core_lock = core_lock
  38. self.pending_data = pending_data
  39. self.pending_lock = pending_lock
  40. self.mexc_data = mexc_data
  41. self.mexc_lock = mexc_lock
  42. # symbol轉大寫
  43. self.symbol = process_item['symbol'].upper()
  44. self.coin = self.symbol.split('_')[0]
  45. self.base_coin = self.symbol.split('_')[1]
  46. with self.core_lock:
  47. self.eth_price = self.core_data['eth_price']
  48. with self.mexc_lock:
  49. self.withdraw_info = copy.deepcopy(self.mexc_data['coin_info_map'][self.base_coin][self.NETWORK])
  50. self.WITHDRAW_FEE = Decimal(self.withdraw_info['withdrawFee']) # 提現手續費
  51. withdraw_info_formated = pformat(self.withdraw_info, indent=2)
  52. logger.info(f'提現信息識別, 手續費:{self.WITHDRAW_FEE}\n{withdraw_info_formated}')
  53. self.tx = tx
  54. self.profit = Decimal(process_item['profit']) # 這個利潤是實際到手利潤
  55. self.profit_limit = Decimal(process_item['profitLimit']) # 這個利潤是實際到手利潤的limit
  56. self.gas_limit_multiplier = gas_limit_multiplier
  57. self.gas_price_multiplier = gas_price_multiplier
  58. self.from_token_addr = process_item['fromToken']
  59. self.from_token_decimal = Decimal(process_item['fromTokenDecimal'])
  60. self.to_token_addr = process_item['toToken']
  61. self.to_token_decimal = Decimal(process_item['toTokenDecimal'])
  62. self.user_exchange_wallet = process_item['userExchangeWallet']
  63. self.user_wallet = process_item['userWallet']
  64. self.process_item = process_item
  65. # 存储当前套利交易的细节信息,例如买入数量、价格等
  66. self.sell_price = Decimal(0)
  67. self.buy_price = Decimal(0)
  68. self.chain_tx_hash = None # 链上买入的tx hash
  69. self.chain_usdt_use = Decimal(process_item['fromTokenAmountHuman']) # 链上usdt减少量(使用量)
  70. self.chain_buy_amount = Decimal(0) # 链上币增加量(购入量), todo, 暂用即时余额代替
  71. self.exchange_sell_amount = Decimal(process_item['exchangeOutAmount']) # 交易所卖出量
  72. self.exchange_sell_order_id = None # 交易所卖出id
  73. self.exchange_withdrawal_id = None # 交易所提现id
  74. self.exchange_withdrawal_amount = None # 交易所提现数量
  75. self.actual_profit = Decimal(0) # 實際利潤
  76. # 定义可能的状态
  77. self.STATES = [
  78. "CHECK", # 检查余额、估算gas等
  79. "SELLING_ON_EXCHANGE", # 正在中心化交易所卖出现货
  80. "WAITING_SELL_CONFIRM", # 等待现货卖出订单确认
  81. "BUYING_ON_CHAIN", # 正在链上买入
  82. "WAITING_CHAIN_CONFIRM", # 等待链上交易确认
  83. "WAITING_EXCHANGE_ROLLBACK", # 等待交易所回滚
  84. # "HEDGING_ON_EXCHANGE", # 正在中心化交易所套保
  85. # "WAITING_HEDGE_CONFIRM", # 等待套保订单确认
  86. # "TRANSFERRING_TO_EXCHANGE", # 正在向交易所转账
  87. # "CLOSING_HEDGE", # 正在平掉套保单
  88. # "WAITING_CLOSE_HEDGE_CONFIRM", # 等待平掉套保单确认
  89. "WAITING_TRANSFER_ARRIVE", # 等待交易所充值到账
  90. "TRANSFERRING_TO_CHAIN", # 正在向链上转账
  91. "WAITING_WITHDRAWAL_CONFIRM", # 等待链上提现确认
  92. "COMPLETED", # 套利流程完成
  93. "REJECT", # 套利被程序拒绝
  94. "FAILED" # 套利流程失败
  95. ]
  96. self.STATE_IDLE = "IDLE"
  97. self.STATE_CHECK = "CHECK"
  98. self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE"
  99. self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM"
  100. self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN"
  101. self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
  102. self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK"
  103. # self.STATE_TRANSFERRING_TO_EXCHANGE = "TRANSFERRING_TO_EXCHANGE"
  104. self.STATE_WAITING_TRANSFER_ARRIVE = "WAITING_TRANSFER_ARRIVE"
  105. self.STATE_TRANSFERRING_TO_CHAIN = "TRANSFERRING_TO_CHAIN"
  106. self.STATE_WAITING_WITHDRAWAL_CONFIRM = "WAITING_WITHDRAWAL_CONFIRM"
  107. self.STATE_COMPLETED = "COMPLETED"
  108. self.STATE_REJECT = "REJECT"
  109. self.STATE_FAILED = "FAILED"
  110. self.current_state = "IDLE"
  111. def _set_state(self, state):
  112. """
  113. 设置系统状态,并打印日志
  114. """
  115. if state in self.STATES:
  116. logger.info(f"状态变更:{self.current_state} -> {state}")
  117. logger.info('')
  118. self.current_state = state
  119. else:
  120. logger.error(f"尝试设置无效状态:{state}")
  121. def run_arbitrage_step(self):
  122. """
  123. 根据当前状态执行套利流程的下一步
  124. 这是一个周期性调用的函数,例如在主循环中调用
  125. """
  126. if self.current_state == self.STATE_CHECK:
  127. self._execute_check()
  128. elif self.current_state == self.STATE_SELLING_ON_EXCHANGE:
  129. self._execute_sell_on_exchange()
  130. elif self.current_state == self.STATE_WAITING_SELL_CONFIRM:
  131. self._wait_sell_confirm()
  132. elif self.current_state == self.STATE_BUYING_ON_CHAIN:
  133. self._execute_buy_on_chain()
  134. elif self.current_state == self.STATE_WAITING_CHAIN_CONFIRM:
  135. self._wait_chain_confirm()
  136. elif self.current_state == self.STATE_WAITING_EXCHANGE_ROLLBACK:
  137. self._wait_exchange_rollback()
  138. # elif self.current_state == "TRANSFERRING_TO_EXCHANGE":
  139. # self._execute_transfer_to_exchange()
  140. elif self.current_state == self.STATE_WAITING_TRANSFER_ARRIVE:
  141. self._wait_transfer_arrive()
  142. elif self.current_state == self.STATE_TRANSFERRING_TO_CHAIN:
  143. self._execute_transfer_to_chain()
  144. elif self.current_state == self.STATE_WAITING_WITHDRAWAL_CONFIRM:
  145. self._wait_withdrawal_confirm()
  146. elif self.current_state == self.STATE_COMPLETED:
  147. msg = "套利流程成功完成!"
  148. logger.info(msg)
  149. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  150. elif self.current_state == self.STATE_REJECT:
  151. msg = "套利流程被程序拒绝"
  152. logger.error(msg)
  153. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  154. elif self.current_state == self.STATE_FAILED:
  155. msg = "套利流程失败!"
  156. logger.error(msg)
  157. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  158. def _execute_check(self):
  159. """
  160. 前置检查,防止低能错误
  161. """
  162. try:
  163. # step1,檢查交易所的餘額是否夠用
  164. # 处理精度
  165. pseudo_amount_to_sell = self.exchange_sell_amount.quantize(Decimal('1'), rounding=ROUND_DOWN)
  166. # 交易所套保余额判断
  167. with self.mexc_lock:
  168. balances = self.mexc_data['account_info']['balances']
  169. for balance in balances:
  170. if balance['asset'] == self.coin:
  171. if Decimal(balance['free']) < pseudo_amount_to_sell:
  172. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 不能触发套保交易。"
  173. logger.info(msg)
  174. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  175. self._set_state(self.STATE_REJECT)
  176. return
  177. else:
  178. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 余额校验通过(可以套保)。"
  179. logger.info(msg)
  180. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  181. break
  182. # step2,估算gas
  183. logger.info("獲取區塊信息")
  184. with self.core_lock:
  185. latest_block = copy.deepcopy(self.core_data['block'])
  186. self.tx['maxPriorityFeePerGas'] = int(int(self.tx['maxPriorityFeePerGas']) * self.gas_price_multiplier)
  187. self.tx['maxFeePerGas'] = int(int(latest_block['baseFeePerGas']) * 2 + self.tx['maxPriorityFeePerGas'])
  188. gas_price = Decimal(self.tx['maxPriorityFeePerGas'] + self.tx['maxFeePerGas'])
  189. gas_price_gwei = gas_price / Decimal('1e9')
  190. gas_price_gwei = gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  191. tx_formated = pformat(self.tx, indent=2)
  192. logger.info(f"鏈上各種校驗\n{tx_formated}")
  193. estimated_gas_origin = web3.w3.eth.estimate_gas(self.tx)
  194. estimated_gas = int(estimated_gas_origin * self.gas_limit_multiplier)
  195. estimated_wei = Decimal(estimated_gas) * gas_price
  196. estimated_eth = Decimal(estimated_wei / Decimal('1e18')) / Decimal(2) # 除以2才是比較接近正常消耗的gas費,否則會過於高估
  197. estimated_eth = estimated_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  198. msg = f"估算的燃气量: {estimated_gas}, eth消耗: {estimated_eth}, gas price: {gas_price_gwei} gwei, gas估算通過"
  199. logger.info(msg)
  200. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  201. # step3, 費用與利潤比較
  202. estimated_eth_value = estimated_eth * self.eth_price
  203. estimated_eth_value = estimated_eth_value.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  204. cost = estimated_eth_value + self.WITHDRAW_FEE # 成本
  205. if self.profit < cost:
  206. msg = f"費用判斷不通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  207. logger.info(msg)
  208. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  209. self._set_state(self.STATE_REJECT)
  210. return
  211. msg = f"費用判斷通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  212. logger.info(msg)
  213. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  214. # step4, 與賬戶eth餘額比對(至少留0.001,不然沒gas了)
  215. MARGIN = Decimal(0.001)
  216. # 暫時鎖住core_data
  217. with self.core_lock:
  218. eth_balance = self.core_data['eth_balance']
  219. if eth_balance - estimated_eth < MARGIN:
  220. msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  221. logger.info(msg)
  222. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  223. self._set_state(self.STATE_REJECT)
  224. return
  225. # 餘額判斷通過后預扣除balance,防止綫程更新不及時導致其他綫程誤發送tx
  226. self.core_data['eth_balance'] = self.core_data['eth_balance'] - estimated_eth
  227. msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  228. logger.info(msg)
  229. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  230. # final, 設定交易狀態,開始交易
  231. self._set_state(self.STATE_SELLING_ON_EXCHANGE)
  232. except Exception as e:
  233. exc_traceback = traceback.format_exc()
  234. msg = f"前置檢查未通過\n{exc_traceback}"
  235. logger.error(msg)
  236. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  237. self._set_state(self.STATE_REJECT)
  238. # traceback.print_exc()
  239. # 以下是每个状态对应的具体执行函数
  240. def _execute_sell_on_exchange(self):
  241. """
  242. 在中心化交易所卖出现货
  243. """
  244. msg = "执行:中心化交易所卖出现货..."
  245. logger.info(msg)
  246. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  247. try:
  248. # 第一步直接卖出,这个数量用固定数量
  249. pseudo_amount_to_sell = self.exchange_sell_amount
  250. # 处理精度
  251. pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
  252. order_params = {
  253. "symbol": self.symbol.replace('_', ''),
  254. "side": "SELL",
  255. "type": "MARKET",
  256. "quantity": int(pseudo_amount_to_sell),
  257. }
  258. order_params_formated = pformat(order_params, indent=2)
  259. exchange_sell_order = mexc.trade.post_order(order_params)
  260. exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  261. msg = f"交易所现货卖出订单已发送 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
  262. if 'orderId' not in exchange_sell_order:
  263. logger.error(msg)
  264. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  265. self._set_state(self.STATE_FAILED)
  266. return
  267. self.exchange_sell_order_id = exchange_sell_order['orderId']
  268. logger.info(msg)
  269. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  270. self._set_state(self.STATE_WAITING_SELL_CONFIRM)
  271. except Exception as e:
  272. exc_traceback = traceback.format_exc()
  273. msg = f"交易所现货卖出下单失败\n{exc_traceback}"
  274. logger.error(msg)
  275. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  276. self._set_state(self.STATE_FAILED)
  277. # traceback.print_exc()
  278. def _wait_sell_confirm(self):
  279. """
  280. 等待交易所现货卖出订单确认(完全成交)
  281. """
  282. exchange_sell_order_id = self.exchange_sell_order_id
  283. msg = f"等待交易所现货卖出订单确认:{exchange_sell_order_id}"
  284. logger.info(msg)
  285. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  286. try:
  287. # 查询交易所订单状态
  288. waiting_times = 30
  289. last_order = None
  290. while waiting_times > 0:
  291. params = {
  292. "symbol": self.symbol.replace('_', ''),
  293. "orderId": exchange_sell_order_id
  294. }
  295. order = mexc.trade.get_order(params)
  296. last_order = order
  297. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  298. money = Decimal(order['cummulativeQuoteQty'])
  299. amount = self.exchange_sell_amount
  300. self.sell_price = money / amount
  301. self.sell_price = self.sell_price.quantize(Decimal('1e-16'), rounding=ROUND_DOWN)
  302. order_formated = pformat(order, indent=2)
  303. msg = f"交易所现货卖出订单已完成, 价格:{self.sell_price}, money: {money}\n order: {order_formated}"
  304. logger.info(msg)
  305. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  306. self.exchange_withdrawal_amount = money
  307. self._set_state(self.STATE_BUYING_ON_CHAIN)
  308. return
  309. else:
  310. time.sleep(1)
  311. waiting_times = waiting_times - 1
  312. last_order_formated = pformat(last_order, indent=2)
  313. msg = f"交易所现货卖出订单失敗, 最後狀態:\n{last_order_formated}。"
  314. logger.info(msg)
  315. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  316. self._set_state(self.STATE_FAILED)
  317. except Exception as e:
  318. exc_traceback = traceback.format_exc()
  319. msg = f"查询交易所现货卖出订单状态时发生错误\n{exc_traceback}"
  320. logger.error(msg)
  321. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  322. self._set_state(self.STATE_FAILED)
  323. # traceback.print_exc()
  324. def _execute_buy_on_chain(self):
  325. """
  326. 在链上执行买入操作
  327. """
  328. msg = "执行:链上买入操作..."
  329. logger.info(msg)
  330. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  331. try:
  332. # 交易前nonce
  333. with self.core_lock:
  334. self.tx['nonce'] = self.core_data['nonce']
  335. # 调用链上客户端执行买入交易
  336. signed_tx = web3._sign(self.tx, self.gas_limit_multiplier)
  337. self.chain_tx_hash = web3.w3.to_hex(signed_tx.hash)
  338. try:
  339. web3.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
  340. except Exception as e:
  341. msg = f"据反饋說链上买入失败:{e}, 交易哈希:{self.chain_tx_hash}"
  342. logger.error(msg)
  343. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  344. # 交易成功后刷新全局nonce
  345. with self.core_lock:
  346. self.core_data['nonce'] = self.core_data['nonce'] + 1
  347. block_number = self.core_data['block_number']
  348. # 將hash放入pending裏,等待確認
  349. with self.pending_lock:
  350. self.pending_data[self.chain_tx_hash] = {
  351. "block_number": block_number,
  352. "tx_details": None,
  353. "reponse": None,
  354. }
  355. # 交易成功
  356. msg = f"再次確認交易是否上鏈:{self.chain_tx_hash}"
  357. logger.info(msg)
  358. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  359. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  360. except Exception as e:
  361. exc_traceback = traceback.format_exc()
  362. msg = f"鏈上買入未處理的錯誤, 交易哈希:{self.chain_tx_hash}\n{exc_traceback}"
  363. logger.error(msg)
  364. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  365. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  366. # traceback.print_exc()
  367. def _wait_chain_confirm(self):
  368. """
  369. 等待链上交易确认
  370. """
  371. chain_tx_hash = self.chain_tx_hash
  372. msg = f"等待链上交易确认:{chain_tx_hash}"
  373. logger.info(msg)
  374. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  375. try:
  376. # 給300秒時間進行確認
  377. waiting_times = 300
  378. while waiting_times > 0:
  379. with self.pending_lock:
  380. tx_details = copy.deepcopy(self.pending_data[chain_tx_hash]['tx_details'])
  381. if tx_details is None:
  382. waiting_times = waiting_times - 1
  383. time.sleep(1)
  384. continue
  385. # # 交易確認后,移除出pending列表
  386. # with self.pending_lock:
  387. # del self.pending_data[chain_tx_hash]
  388. # 交易失敗的邏輯處理,直接進行回滾
  389. if 'fromTokenDetails' not in tx_details \
  390. or 'toTokenDetails' not in tx_details:
  391. msg = f"链上交易失敗。{tx_details}"
  392. logger.info(msg)
  393. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  394. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  395. break
  396. tx_details_formated = pformat(tx_details, indent=2)
  397. msg = f"链上交易已确认。\n details: {tx_details_formated}"
  398. logger.info(msg)
  399. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  400. # 獲取交易信息
  401. from_token_details = tx_details['fromTokenDetails']
  402. to_token_details = tx_details['toTokenDetails']
  403. from_token_amount = Decimal(from_token_details['amount'])
  404. from_token_amount_human = from_token_amount / (Decimal(10) ** self.from_token_decimal)
  405. from_token_amount_human = from_token_amount_human.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  406. self.chain_buy_amount = from_token_amount_human # 存储实际买入数量
  407. to_token_amount = Decimal(to_token_details['amount'])
  408. to_token_amount_human = to_token_amount / (Decimal(10) ** self.to_token_decimal)
  409. self.buy_price = from_token_amount_human / to_token_amount_human
  410. self.buy_price = self.buy_price.quantize(Decimal('1e-16'), rounding=ROUND_DOWN)
  411. # 交易預估利潤百分比計算
  412. rate = self.sell_price / self.buy_price
  413. rate = rate.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  414. msg = f"【比率{rate}】。用{from_token_amount_human}买入{to_token_amount_human},价格{self.buy_price}。"
  415. logger.info(msg)
  416. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  417. # 判斷快速二賣條件
  418. diff = int(to_token_amount_human - self.exchange_sell_amount)
  419. value = diff * self.sell_price
  420. value = value.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  421. if value > 2:
  422. msg = f"滿足二賣條件,{diff}*{self.sell_price} = {value}"
  423. logger.info(msg)
  424. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  425. order_params = {
  426. "symbol": self.symbol.replace('_', ''),
  427. "side": "SELL",
  428. "type": "MARKET",
  429. "quantity": int(diff),
  430. }
  431. order_params_formated = pformat(order_params, indent=2)
  432. exchange_sell_order = mexc.trade.post_order(order_params)
  433. exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  434. if 'orderId' not in exchange_sell_order:
  435. msg = f"交易所现货二卖下单失败 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
  436. logger.error(msg)
  437. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  438. else:
  439. oid = exchange_sell_order['orderId']
  440. # 查询交易所订单状态
  441. waiting_times_inner = 30
  442. last_order = None
  443. while waiting_times_inner > 0:
  444. params = {
  445. "symbol": self.symbol.replace('_', ''),
  446. "orderId": oid
  447. }
  448. order = mexc.trade.get_order(params)
  449. order_formated = pformat(order, indent=2)
  450. last_order = order
  451. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  452. money = Decimal(order['cummulativeQuoteQty'])
  453. msg = f"交易所现货二卖订单已完成, money: {money}。\n order: {order_formated}"
  454. logger.info(msg)
  455. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  456. self.exchange_withdrawal_amount = self.exchange_withdrawal_amount + money
  457. break
  458. else:
  459. time.sleep(1)
  460. waiting_times_inner = waiting_times_inner - 1
  461. if waiting_times_inner <= 0:
  462. last_order_formated = pformat(last_order, indent=2)
  463. msg = f"交易所现货二卖订单失敗, 最後狀態:{last_order_formated}。"
  464. logger.info(msg)
  465. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  466. else:
  467. msg = f"不滿足二賣條件,{diff}*{self.sell_price} = {value}"
  468. logger.info(msg)
  469. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  470. # 計算實際利潤
  471. actual_profit = value
  472. actual_gas_price = Decimal(tx_details['gasPrice'])
  473. actual_gas_price_gwei = actual_gas_price / Decimal('1e9')
  474. actual_gas_price_gwei = actual_gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  475. actual_gas_used = Decimal(tx_details['gasUsed'])
  476. actual_wei = actual_gas_price * actual_gas_used
  477. actual_eth = actual_wei / Decimal('1e18')
  478. actual_eth = actual_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  479. actual_fee_used = actual_eth * self.eth_price
  480. actual_fee_used = actual_fee_used.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  481. actual_profit = value - actual_fee_used - self.WITHDRAW_FEE
  482. msg = f"【最終利潤】{actual_profit}{self.base_coin}(已扣除所有手續費、滑點)\
  483. \n鏈上ETH使用: {actual_eth}({actual_fee_used} USD), gas_price: {actual_gas_price_gwei} GWEI, gas_used: {actual_gas_used}\
  484. \n交易所出售代幣利潤: {value}, 提現手續費: {self.WITHDRAW_FEE}\
  485. "
  486. logger.info(msg)
  487. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  488. self._set_state(self.STATE_COMPLETED)
  489. break
  490. # 如果300秒都沒確認成功,該交易大概率沒有上鏈
  491. if waiting_times <= 0:
  492. with self.pending_lock:
  493. response = copy.deepcopy(self.pending_data[chain_tx_hash]['response'])
  494. response_formated = pformat(response, indent=2)
  495. msg = f"链上交易确认失败:{chain_tx_hash}\n{response_formated}"
  496. logger.error(msg)
  497. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  498. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  499. except Exception as e:
  500. exc_traceback = traceback.format_exc()
  501. msg = f"查询链上确认状态时发生错误\n{exc_traceback}"
  502. logger.error(msg)
  503. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  504. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  505. # traceback.print_exc()
  506. def _wait_exchange_rollback(self):
  507. """
  508. 市价进行交易所交易回滚
  509. """
  510. msg = "执行:中心化交易所买入现货回滚..."
  511. logger.info(msg)
  512. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  513. try:
  514. # 使用预提现数量进行回滚
  515. pseudo_amount_to_buy = Decimal(self.exchange_withdrawal_amount)
  516. # 处理精度
  517. pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(Decimal('1'), rounding=ROUND_DOWN)
  518. # 交易所U余额判断
  519. with self.mexc_lock:
  520. balances = self.mexc_data['account_info']['balances']
  521. for balance in balances:
  522. if balance['asset'] == self.base_coin:
  523. pseudo_amount_to_buy = min(Decimal(balance['free']), pseudo_amount_to_buy)
  524. if pseudo_amount_to_buy < Decimal('10'):
  525. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 小于10, 不能触发回滚交易。"
  526. logger.info(msg)
  527. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  528. self._set_state(self.STATE_FAILED)
  529. return
  530. else:
  531. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 交易所准备使用:{pseudo_amount_to_buy}, 余额校验通过(可以回滚)。"
  532. logger.info(msg)
  533. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  534. break
  535. order_params = {
  536. "symbol": self.symbol.replace('_', ''),
  537. "side": "BUY",
  538. "type": "MARKET",
  539. "quoteOrderQty": int(pseudo_amount_to_buy),
  540. }
  541. order_params_formated = pformat(order_params, indent=2)
  542. exchange_buy_order = mexc.trade.post_order(order_params)
  543. exchange_buy_order_formated = pformat(exchange_buy_order, indent=2)
  544. if 'orderId' not in exchange_buy_order:
  545. msg = f"【回滚】交易所现货买入下单失败\n params:{order_params_formated}\norder: {exchange_buy_order_formated}"
  546. logger.error(msg)
  547. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  548. self._set_state("FAILED")
  549. return
  550. exchange_buy_order_id = exchange_buy_order['orderId']
  551. msg = f"【回滚】交易所现货买入订单已发送, 订单ID: {exchange_buy_order_id}"
  552. logger.info(msg)
  553. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  554. # 查询交易所订单状态
  555. waiting_times = 30
  556. last_query_rst = None
  557. while True:
  558. params = {
  559. "symbol": self.symbol.replace('_', ''),
  560. "orderId": exchange_buy_order_id
  561. }
  562. order = mexc.trade.get_order(params)
  563. order_formated = pformat(order, indent=2)
  564. last_query_rst = order
  565. if order['status'] == "FILLED":
  566. money = Decimal(order['cummulativeQuoteQty'])
  567. amount = self.exchange_sell_amount
  568. price = money / amount
  569. price = price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  570. msg = f"【回滚】交易所现货买入订单已完全成交, 价格:{price}。\norder: {order_formated}"
  571. logger.info(msg)
  572. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  573. self._set_state(self.STATE_FAILED)
  574. return
  575. else:
  576. # 继续等待成交
  577. pass
  578. time.sleep(1)
  579. waiting_times = waiting_times - 1
  580. last_query_rst_formated = pformat(last_query_rst, indent=2)
  581. msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_buy_order_id}\n最终状态:{last_query_rst_formated}"
  582. logger.info(msg)
  583. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  584. self._set_state(self.STATE_FAILED)
  585. except Exception as e:
  586. exc_traceback = traceback.format_exc()
  587. msg = f"【回滚】交易所回滚交易失败\n{exc_traceback}"
  588. logger.error(msg)
  589. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  590. self._set_state(self.STATE_FAILED)
  591. # traceback.print_exc()
  592. # 伪代码示例:如何使用这个类
  593. if __name__ == "__main__":
  594. import ok_chain_client
  595. import decimal
  596. import pprint
  597. CHAIN_ID = 1
  598. FROM_TOKEN = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
  599. FROM_TOKEN_AMOUNT_HUMAM = Decimal('20')
  600. FROM_TOKEN_DECIMAL = 6
  601. TO_TOKEN = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860'
  602. USER_WALLET = ''
  603. USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
  604. SYMBOL = "RATO_USDT"
  605. # 询价,注意!!!这里直接把交易所地址当收款方,省去transfer的流程
  606. data = ok_chain_client.swap(CHAIN_ID,
  607. FROM_TOKEN_AMOUNT_HUMAM * (10 ** FROM_TOKEN_DECIMAL),
  608. FROM_TOKEN,
  609. TO_TOKEN,
  610. 1,
  611. USER_WALLET,
  612. USER_EXCHANGE_WALLET, # 这里直接把交易所地址当收款方,省去transfer的流程!!!
  613. )
  614. if data.get('code') != '0' or not data.get('data'):
  615. pprint.pprint(data)
  616. pprint.pprint({
  617. "error": f"OK API错误({1}) - Code:{data.get('code', 'N/A')}, Msg:{data.get('msg', data.get('message', 'N/A')) if isinstance(data, dict) else '格式错误'}"})
  618. raise Exception("")
  619. d = data['data'][0]
  620. tx = d['tx']
  621. router_result = d['routerResult']
  622. in_dec, out_dec = int(router_result['fromToken']['decimal']), int(router_result['toToken']['decimal'])
  623. atomic_in_base, atomic_out_target = Decimal(router_result['fromTokenAmount']), Decimal(router_result['toTokenAmount'])
  624. human_in_base = atomic_in_base / (10 ** in_dec)
  625. human_out_target = atomic_out_target / (10 ** out_dec)
  626. FROM_TOKEN_AMOUNT_HUMAM = human_in_base
  627. TO_TOKEN_AMOUNT_HUMAM = human_out_target
  628. pprint.pprint(tx)
  629. # 套利流程执行
  630. process_item = {
  631. "stateFlow": [], # 状态流转记录
  632. }
  633. ap = ArbitrageProcess(tx, 2, 1.2,
  634. FROM_TOKEN, TO_TOKEN,
  635. FROM_TOKEN_AMOUNT_HUMAM, TO_TOKEN_AMOUNT_HUMAM,
  636. USER_EXCHANGE_WALLET, USER_WALLET,
  637. SYMBOL, process_item)
  638. # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
  639. ap._set_state(ap.SELLING_ON_EXCHANGE)
  640. # 在主循环中周期性调用 run_arbitrage_step
  641. while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
  642. ap.run_arbitrage_step()
  643. if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
  644. time.sleep(10)
  645. # else:
  646. # time.sleep(1)
  647. logger.info(process_item)
  648. if ap.current_state == "COMPLETED":
  649. logger.info("套利流程执行成功!")
  650. else:
  651. logger.info("套利流程执行失败!")