erc20_to_mexc.py 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962
  1. import time
  2. import traceback
  3. import copy
  4. from web3_py_client import EthClient
  5. from mexc_client import MexcClient
  6. from decimal import Decimal, ROUND_DOWN
  7. from as_utils import add_state_flow_entry
  8. from checker.logger_config import get_logger
  9. from pprint import pformat
  10. web3 = EthClient()
  11. mexc = MexcClient()
  12. # 配置日志
  13. logger = get_logger('as')
  14. class ArbitrageProcess:
  15. def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier, process_item,
  16. core_data, core_lock,
  17. pending_data, pending_lock,
  18. ):
  19. """
  20. 初始化套利流程
  21. Args:
  22. tx: 在链上要发送交易的tx
  23. gas_limit_multiplier: gas limit倍数, 一般都不加倍
  24. gas_price_multiplier: gas price倍数, 可以提高交易成功率
  25. """
  26. self.WITHDRAW_FEE = Decimal(0.5) # 提現手續費
  27. tx.pop('gasPrice', None)
  28. tx.pop('value', None)
  29. tx.pop('minReceiveAmount', None)
  30. tx.pop('slippage', None)
  31. tx.pop('maxSpendAmount', None)
  32. tx.pop('signatureData', None)
  33. self.core_data = core_data
  34. self.core_lock = core_lock
  35. self.pending_data = pending_data
  36. self.pending_lock = pending_lock
  37. with self.core_lock:
  38. self.eth_price = self.core_data['eth_price']
  39. self.tx = tx
  40. self.profit = Decimal(process_item['profit']) # 這個利潤是實際到手利潤
  41. self.profit_limit = Decimal(process_item['profitLimit']) # 這個利潤是實際到手利潤的limit
  42. self.gas_limit_multiplier = gas_limit_multiplier
  43. self.gas_price_multiplier = gas_price_multiplier
  44. self.from_token_addr = process_item['fromToken']
  45. self.from_token_decimal = Decimal(process_item['fromTokenDecimal'])
  46. self.to_token_addr = process_item['toToken']
  47. self.to_token_decimal = Decimal(process_item['toTokenDecimal'])
  48. self.user_exchange_wallet = process_item['userExchangeWallet']
  49. self.user_wallet = process_item['userWallet']
  50. self.symbol = process_item['symbol']
  51. self.coin = self.symbol.split('_')[0]
  52. self.base_coin = self.symbol.split('_')[1]
  53. self.process_item = process_item
  54. # 存储当前套利交易的细节信息,例如买入数量、价格等
  55. self.sell_price = Decimal(0)
  56. self.buy_price = Decimal(0)
  57. self.chain_tx_hash = None # 链上买入的tx hash
  58. self.chain_usdt_use = Decimal(process_item['fromTokenAmountHuman']) # 链上usdt减少量(使用量)
  59. self.chain_buy_amount = Decimal(0) # 链上币增加量(购入量), todo, 暂用即时余额代替
  60. self.exchange_sell_amount = Decimal(process_item['exchangeOutAmount']) # 交易所卖出量
  61. self.exchange_sell_order_id = None # 交易所卖出id
  62. self.exchange_withdrawal_id = None # 交易所提现id
  63. self.exchange_withdrawal_amount = None # 交易所提现数量
  64. self.actual_profit = Decimal(0) # 實際利潤
  65. # 定义可能的状态
  66. self.STATES = [
  67. "CHECK", # 检查余额、估算gas等
  68. "SELLING_ON_EXCHANGE", # 正在中心化交易所卖出现货
  69. "WAITING_SELL_CONFIRM", # 等待现货卖出订单确认
  70. "BUYING_ON_CHAIN", # 正在链上买入
  71. "WAITING_CHAIN_CONFIRM", # 等待链上交易确认
  72. "WAITING_EXCHANGE_ROLLBACK", # 等待交易所回滚
  73. # "HEDGING_ON_EXCHANGE", # 正在中心化交易所套保
  74. # "WAITING_HEDGE_CONFIRM", # 等待套保订单确认
  75. # "TRANSFERRING_TO_EXCHANGE", # 正在向交易所转账
  76. # "CLOSING_HEDGE", # 正在平掉套保单
  77. # "WAITING_CLOSE_HEDGE_CONFIRM", # 等待平掉套保单确认
  78. "WAITING_TRANSFER_ARRIVE", # 等待交易所充值到账
  79. "TRANSFERRING_TO_CHAIN", # 正在向链上转账
  80. "WAITING_WITHDRAWAL_CONFIRM", # 等待链上提现确认
  81. "COMPLETED", # 套利流程完成
  82. "REJECT", # 套利被程序拒绝
  83. "FAILED" # 套利流程失败
  84. ]
  85. self.STATE_IDLE = "IDLE"
  86. self.STATE_CHECK = "CHECK"
  87. self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE"
  88. self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM"
  89. self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN"
  90. self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
  91. self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK"
  92. # self.STATE_TRANSFERRING_TO_EXCHANGE = "TRANSFERRING_TO_EXCHANGE"
  93. self.STATE_WAITING_TRANSFER_ARRIVE = "WAITING_TRANSFER_ARRIVE"
  94. self.STATE_TRANSFERRING_TO_CHAIN = "TRANSFERRING_TO_CHAIN"
  95. self.STATE_WAITING_WITHDRAWAL_CONFIRM = "WAITING_WITHDRAWAL_CONFIRM"
  96. self.STATE_COMPLETED = "COMPLETED"
  97. self.STATE_REJECT = "REJECT"
  98. self.STATE_FAILED = "FAILED"
  99. self.current_state = "IDLE"
  100. def _set_state(self, state):
  101. """
  102. 设置系统状态,并打印日志
  103. """
  104. if state in self.STATES:
  105. logger.info(f"状态变更:{self.current_state} -> {state}")
  106. logger.info('')
  107. self.current_state = state
  108. else:
  109. logger.error(f"尝试设置无效状态:{state}")
  110. def run_arbitrage_step(self):
  111. """
  112. 根据当前状态执行套利流程的下一步
  113. 这是一个周期性调用的函数,例如在主循环中调用
  114. """
  115. if self.current_state == self.STATE_CHECK:
  116. self._execute_check()
  117. elif self.current_state == self.STATE_SELLING_ON_EXCHANGE:
  118. self._execute_sell_on_exchange()
  119. elif self.current_state == self.STATE_WAITING_SELL_CONFIRM:
  120. self._wait_sell_confirm()
  121. elif self.current_state == self.STATE_BUYING_ON_CHAIN:
  122. self._execute_buy_on_chain()
  123. elif self.current_state == self.STATE_WAITING_CHAIN_CONFIRM:
  124. self._wait_chain_confirm()
  125. elif self.current_state == self.STATE_WAITING_EXCHANGE_ROLLBACK:
  126. self._wait_exchange_rollback()
  127. # elif self.current_state == "TRANSFERRING_TO_EXCHANGE":
  128. # self._execute_transfer_to_exchange()
  129. elif self.current_state == self.STATE_WAITING_TRANSFER_ARRIVE:
  130. self._wait_transfer_arrive()
  131. elif self.current_state == self.STATE_TRANSFERRING_TO_CHAIN:
  132. self._execute_transfer_to_chain()
  133. elif self.current_state == self.STATE_WAITING_WITHDRAWAL_CONFIRM:
  134. self._wait_withdrawal_confirm()
  135. elif self.current_state == self.STATE_COMPLETED:
  136. msg = "套利流程成功完成!"
  137. logger.info(msg)
  138. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  139. elif self.current_state == self.STATE_REJECT:
  140. msg = "套利流程被程序拒绝"
  141. logger.error(msg)
  142. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  143. elif self.current_state == self.STATE_FAILED:
  144. msg = "套利流程失败!"
  145. logger.error(msg)
  146. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  147. def _execute_check(self):
  148. """
  149. 前置检查,防止低能错误
  150. """
  151. try:
  152. # step1,檢查交易所的餘額是否夠用
  153. # 处理精度
  154. pseudo_amount_to_sell = self.exchange_sell_amount.quantize(Decimal('1'), rounding=ROUND_DOWN)
  155. # 交易所套保余额判断
  156. balances = mexc.trade.get_account_info()['balances']
  157. for balance in balances:
  158. if balance['asset'] == self.coin:
  159. if Decimal(balance['free']) < pseudo_amount_to_sell:
  160. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 不能触发套保交易。"
  161. logger.info(msg)
  162. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  163. self._set_state(self.STATE_REJECT)
  164. return
  165. else:
  166. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 余额校验通过(可以套保)。"
  167. logger.info(msg)
  168. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  169. break
  170. # step2,估算gas
  171. latest_block = web3.w3.eth.get_block('latest')
  172. self.tx['maxPriorityFeePerGas'] = int(int(self.tx['maxPriorityFeePerGas']) * self.gas_price_multiplier)
  173. self.tx['maxFeePerGas'] = int(int(latest_block['baseFeePerGas']) * 2 + self.tx['maxPriorityFeePerGas'])
  174. gas_price = Decimal(self.tx['maxPriorityFeePerGas'] + self.tx['maxFeePerGas'])
  175. gas_price_gwei = gas_price / Decimal('1e9')
  176. gas_price_gwei = gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  177. estimated_gas_origin = web3.w3.eth.estimate_gas(self.tx)
  178. estimated_gas = int(estimated_gas_origin * self.gas_limit_multiplier)
  179. estimated_wei = Decimal(estimated_gas) * gas_price
  180. estimated_eth = Decimal(estimated_wei / Decimal('1e18')) / Decimal(2) # 除以2才是比較接近正常消耗的gas費,否則會過於高估
  181. estimated_eth = estimated_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  182. msg = f"估算的燃气量: {estimated_gas}, eth消耗: {estimated_eth}, gas price: {gas_price_gwei} gwei, gas估算通過"
  183. logger.info(msg)
  184. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  185. # step3, 費用與利潤比較
  186. estimated_eth_value = estimated_eth * self.eth_price
  187. estimated_eth_value = estimated_eth_value.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  188. cost = estimated_eth_value + self.WITHDRAW_FEE # 成本
  189. if self.profit < cost:
  190. msg = f"費用判斷不通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  191. logger.info(msg)
  192. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  193. self._set_state(self.STATE_REJECT)
  194. return
  195. msg = f"費用判斷通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  196. logger.info(msg)
  197. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  198. # step4, 與賬戶eth餘額比對(至少留0.001,不然沒gas了)
  199. MARGIN = 0.001
  200. eth_balance_origin = web3.w3.eth.get_balance(self.user_wallet)
  201. eth_balance = Decimal(eth_balance_origin / (10 ** 18))
  202. eth_balance = eth_balance.quantize(Decimal('1e-6'), rounding=ROUND_DOWN)
  203. if eth_balance - estimated_eth < MARGIN:
  204. msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  205. logger.info(msg)
  206. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  207. self._set_state(self.STATE_REJECT)
  208. return
  209. msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  210. logger.info(msg)
  211. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  212. # final, 設定交易狀態,開始交易
  213. self._set_state(self.STATE_SELLING_ON_EXCHANGE)
  214. except Exception as e:
  215. msg = f"前置檢查未通過:{e}"
  216. logger.error(msg)
  217. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  218. self._set_state(self.STATE_REJECT)
  219. traceback.print_exc()
  220. # 以下是每个状态对应的具体执行函数
  221. def _execute_sell_on_exchange(self):
  222. """
  223. 在中心化交易所卖出现货
  224. """
  225. msg = "执行:中心化交易所卖出现货..."
  226. logger.info(msg)
  227. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  228. try:
  229. # 第一步直接卖出,这个数量用固定数量
  230. pseudo_amount_to_sell = self.exchange_sell_amount
  231. # 处理精度
  232. pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
  233. order_params = {
  234. "symbol": self.symbol.replace('_', ''),
  235. "side": "SELL",
  236. "type": "MARKET",
  237. "quantity": int(pseudo_amount_to_sell),
  238. }
  239. order_params_formated = pformat(order_params, indent=2)
  240. exchange_sell_order = mexc.trade.post_order(order_params)
  241. exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  242. msg = f"交易所现货卖出订单已发送 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
  243. if 'orderId' not in exchange_sell_order:
  244. logger.error(msg)
  245. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  246. self._set_state(self.STATE_FAILED)
  247. return
  248. self.exchange_sell_order_id = exchange_sell_order['orderId']
  249. logger.info(msg)
  250. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  251. self._set_state(self.STATE_WAITING_SELL_CONFIRM)
  252. except Exception as e:
  253. msg = f"交易所现货卖出下单失败:{e}"
  254. logger.error(msg)
  255. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  256. self._set_state(self.STATE_FAILED)
  257. traceback.print_exc()
  258. def _wait_sell_confirm(self):
  259. """
  260. 等待交易所现货卖出订单确认(完全成交)
  261. """
  262. exchange_sell_order_id = self.exchange_sell_order_id
  263. msg = f"等待交易所现货卖出订单确认:{exchange_sell_order_id}"
  264. logger.info(msg)
  265. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  266. try:
  267. # 查询交易所订单状态
  268. waiting_times = 30
  269. last_order = None
  270. while waiting_times > 0:
  271. params = {
  272. "symbol": self.symbol.replace('_', ''),
  273. "orderId": exchange_sell_order_id
  274. }
  275. order = mexc.trade.get_order(params)
  276. last_order = order
  277. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  278. money = Decimal(order['cummulativeQuoteQty'])
  279. amount = self.exchange_sell_amount
  280. self.sell_price = money / amount
  281. self.sell_price = self.sell_price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  282. order_formated = pformat(order, indent=2)
  283. msg = f"交易所现货卖出订单已完成, 价格:{self.sell_price}, money: {money}\n order: {order_formated}"
  284. logger.info(msg)
  285. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  286. self.exchange_withdrawal_amount = money
  287. self._set_state(self.STATE_BUYING_ON_CHAIN)
  288. return
  289. else:
  290. time.sleep(1)
  291. waiting_times = waiting_times - 1
  292. last_order_formated = pformat(last_order, indent=2)
  293. msg = f"交易所现货卖出订单失敗, 最後狀態:\n{last_order_formated}。"
  294. logger.info(msg)
  295. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  296. self._set_state(self.STATE_FAILED)
  297. except Exception as e:
  298. msg = f"查询交易所现货卖出订单状态时发生错误:{e}"
  299. logger.error(msg)
  300. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  301. self._set_state(self.STATE_FAILED)
  302. traceback.print_exc()
  303. def _execute_buy_on_chain(self):
  304. """
  305. 在链上执行买入操作
  306. """
  307. msg = "执行:链上买入操作..."
  308. logger.info(msg)
  309. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  310. try:
  311. # 交易前nonce
  312. with self.core_lock:
  313. self.tx['nonce'] = self.core_data['nonce']
  314. # 调用链上客户端执行买入交易
  315. signed_tx = web3._sign(self.tx, self.gas_limit_multiplier)
  316. self.chain_tx_hash = web3.w3.to_hex(signed_tx.hash)
  317. try:
  318. web3.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
  319. except Exception as e:
  320. msg = f"据反饋說链上买入失败:{e}, 交易哈希:{self.chain_tx_hash}"
  321. logger.error(msg)
  322. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  323. # 交易成功后刷新全局nonce
  324. with self.core_lock:
  325. self.core_data['nonce'] = self.core_data['nonce'] + 1
  326. block_number = self.core_data['block_number']
  327. # 將hash放入pending裏,等待確認
  328. with self.pending_lock:
  329. self.pending_data[self.chain_tx_hash] = {
  330. "block_number": block_number,
  331. "tx_details": None,
  332. "reponse": None,
  333. }
  334. # 交易成功
  335. msg = f"再次確認交易是否上鏈:{self.chain_tx_hash}"
  336. logger.info(msg)
  337. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  338. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  339. except Exception as e:
  340. msg = f"鏈上買入未處理的錯誤:{e}, 交易哈希:{self.chain_tx_hash}"
  341. logger.error(msg)
  342. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  343. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  344. traceback.print_exc()
  345. def _wait_chain_confirm(self):
  346. """
  347. 等待链上交易确认
  348. """
  349. chain_tx_hash = self.chain_tx_hash
  350. msg = f"等待链上交易确认:{chain_tx_hash}"
  351. logger.info(msg)
  352. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  353. try:
  354. # 給300秒時間進行確認
  355. waiting_times = 300
  356. while waiting_times > 0:
  357. with self.pending_lock:
  358. tx_details = copy.deepcopy(self.pending_data[chain_tx_hash]['tx_details'])
  359. if tx_details is None:
  360. waiting_times = waiting_times - 1
  361. time.sleep(1)
  362. continue
  363. # # 交易確認后,移除出pending列表
  364. # with self.pending_lock:
  365. # del self.pending_data[chain_tx_hash]
  366. # 交易失敗的邏輯處理,直接進行回滾
  367. if 'fromTokenDetails' not in tx_details \
  368. or 'toTokenDetails' not in tx_details:
  369. msg = f"链上交易失敗。{tx_details}"
  370. logger.info(msg)
  371. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  372. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  373. break
  374. tx_details_formated = pformat(tx_details, indent=2)
  375. msg = f"链上交易已确认。\n details: {tx_details_formated}"
  376. logger.info(msg)
  377. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  378. # 獲取交易信息
  379. from_token_details = tx_details['fromTokenDetails']
  380. to_token_details = tx_details['toTokenDetails']
  381. from_token_amount = Decimal(from_token_details['amount'])
  382. from_token_amount_human = from_token_amount / (Decimal(10) ** self.from_token_decimal)
  383. from_token_amount_human = from_token_amount_human.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  384. self.chain_buy_amount = from_token_amount_human # 存储实际买入数量
  385. to_token_amount = Decimal(to_token_details['amount'])
  386. to_token_amount_human = to_token_amount / (Decimal(10) ** self.to_token_decimal)
  387. self.buy_price = from_token_amount_human / to_token_amount_human
  388. self.buy_price = self.buy_price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  389. # 交易預估利潤百分比計算
  390. rate = self.sell_price / self.buy_price
  391. rate = rate.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  392. msg = f"【比率{rate}】。用{from_token_amount_human}买入{to_token_amount_human},价格{self.buy_price}。"
  393. logger.info(msg)
  394. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  395. # 判斷快速二賣條件
  396. diff = int(to_token_amount_human - self.exchange_sell_amount)
  397. value = diff * self.sell_price
  398. value = value.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  399. if value > 2:
  400. msg = f"滿足二賣條件,{diff}*{self.sell_price} = {value}"
  401. logger.info(msg)
  402. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  403. order_params = {
  404. "symbol": self.symbol.replace('_', ''),
  405. "side": "SELL",
  406. "type": "MARKET",
  407. "quantity": int(diff),
  408. }
  409. order_params_formated = pformat(order_params, indent=2)
  410. exchange_sell_order = mexc.trade.post_order(order_params)
  411. exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  412. if 'orderId' not in exchange_sell_order:
  413. msg = f"交易所现货二卖下单失败 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
  414. logger.error(msg)
  415. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  416. else:
  417. oid = exchange_sell_order['orderId']
  418. # 查询交易所订单状态
  419. waiting_times_inner = 30
  420. last_order = None
  421. while waiting_times_inner > 0:
  422. params = {
  423. "symbol": self.symbol.replace('_', ''),
  424. "orderId": oid
  425. }
  426. order = mexc.trade.get_order(params)
  427. order_formated = pformat(order, indent=2)
  428. last_order = order
  429. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  430. money = Decimal(order['cummulativeQuoteQty'])
  431. msg = f"交易所现货二卖订单已完成, money: {money}。\n order: {order_formated}"
  432. logger.info(msg)
  433. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  434. self.exchange_withdrawal_amount = self.exchange_withdrawal_amount + money
  435. break
  436. else:
  437. time.sleep(1)
  438. waiting_times_inner = waiting_times_inner - 1
  439. if waiting_times_inner <= 0:
  440. last_order_formated = pformat(last_order, indent=2)
  441. msg = f"交易所现货二卖订单失敗, 最後狀態:{last_order_formated}。"
  442. logger.info(msg)
  443. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  444. else:
  445. msg = f"不滿足二賣條件,{diff}*{self.sell_price} = {value}"
  446. logger.info(msg)
  447. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  448. # 計算實際利潤
  449. actual_profit = value
  450. actual_gas_price = Decimal(tx_details['gasPrice'])
  451. actual_gas_price_gwei = actual_gas_price / Decimal('1e9')
  452. actual_gas_price_gwei = actual_gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  453. actual_gas_used = Decimal(tx_details['gasUsed'])
  454. actual_wei = actual_gas_price * actual_gas_used
  455. actual_eth = actual_wei / Decimal('1e18')
  456. actual_eth = actual_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  457. actual_fee_used = actual_eth * self.eth_price
  458. actual_fee_used = actual_fee_used.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  459. actual_profit = value - actual_fee_used - self.WITHDRAW_FEE
  460. msg = f"【最終利潤】{actual_profit}{self.base_coin}(已扣除所有手續費、滑點)\
  461. \n鏈上ETH使用: {actual_eth}({actual_fee_used} USD), gas_price: {actual_gas_price_gwei} GWEI, gas_used: {actual_gas_used}\
  462. \n交易所出售代幣利潤: {value}, 提現手續費: {self.WITHDRAW_FEE}\
  463. "
  464. logger.info(msg)
  465. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  466. self._set_state(self.STATE_WAITING_TRANSFER_ARRIVE)
  467. break
  468. # 如果300秒都沒確認成功,該交易大概率沒有上鏈
  469. if waiting_times <= 0:
  470. with self.pending_lock:
  471. response = copy.deepcopy(self.pending_data[chain_tx_hash]['response'])
  472. response_formated = pformat(reponse, indent=2)
  473. msg = f"链上交易确认失败:{chain_tx_hash}\n{response_formated}"
  474. logger.error(msg)
  475. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  476. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  477. except Exception as e:
  478. msg = f"查询链上确认状态时发生错误:{e}"
  479. logger.error(msg)
  480. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  481. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  482. traceback.print_exc()
  483. def _wait_exchange_rollback(self):
  484. """
  485. 市价进行交易所交易回滚
  486. """
  487. msg = "执行:中心化交易所买入现货回滚..."
  488. logger.info(msg)
  489. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  490. try:
  491. # 使用预提现数量进行回滚
  492. pseudo_amount_to_buy = Decimal(self.exchange_withdrawal_amount)
  493. # 处理精度
  494. pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(Decimal('1'), rounding=ROUND_DOWN)
  495. # 交易所U余额判断
  496. balances = mexc.trade.get_account_info()['balances']
  497. for balance in balances:
  498. if balance['asset'] == self.base_coin:
  499. pseudo_amount_to_buy = min(Decimal(balance['free']), pseudo_amount_to_buy)
  500. if pseudo_amount_to_buy < Decimal('10'):
  501. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 小于10, 不能触发回滚交易。"
  502. logger.info(msg)
  503. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  504. self._set_state(self.STATE_FAILED)
  505. return
  506. else:
  507. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 交易所准备使用:{pseudo_amount_to_buy}, 余额校验通过(可以回滚)。"
  508. logger.info(msg)
  509. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  510. break
  511. order_params = {
  512. "symbol": self.symbol.replace('_', ''),
  513. "side": "BUY",
  514. "type": "MARKET",
  515. "quoteOrderQty": int(pseudo_amount_to_buy),
  516. }
  517. order_params_formated = pformat(order_params, indent=2)
  518. exchange_buy_order = mexc.trade.post_order(order_params)
  519. exchange_buy_order_formated = pformat(exchange_buy_order, indent=2)
  520. if 'orderId' not in exchange_buy_order:
  521. msg = f"【回滚】交易所现货买入下单失败\n params:{order_params_formated}\norder: {exchange_buy_order_formated}"
  522. logger.error(msg)
  523. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  524. self._set_state("FAILED")
  525. return
  526. exchange_buy_order_id = exchange_buy_order['orderId']
  527. msg = f"【回滚】交易所现货买入订单已发送, 订单ID: {exchange_buy_order_id}"
  528. logger.info(msg)
  529. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  530. # 查询交易所订单状态
  531. waiting_times = 30
  532. last_query_rst = None
  533. while True:
  534. params = {
  535. "symbol": self.symbol.replace('_', ''),
  536. "orderId": exchange_buy_order_id
  537. }
  538. order = mexc.trade.get_order(params)
  539. order_formated = pformat(order, indent=2)
  540. last_query_rst = order
  541. if order['status'] == "FILLED":
  542. money = Decimal(order['cummulativeQuoteQty'])
  543. amount = self.exchange_sell_amount
  544. price = money / amount
  545. price = price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  546. msg = f"【回滚】交易所现货买入订单已完全成交, 价格:{price}。\norder: {order_formated}"
  547. logger.info(msg)
  548. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  549. self._set_state(self.STATE_FAILED)
  550. return
  551. else:
  552. # 继续等待成交
  553. pass
  554. time.sleep(1)
  555. waiting_times = waiting_times - 1
  556. last_query_rst_formated = pformat(last_query_rst, indent=2)
  557. msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_buy_order_id}\n最终状态:{last_query_rst_formated}"
  558. logger.info(msg)
  559. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  560. self._set_state(self.STATE_FAILED)
  561. except Exception as e:
  562. msg = f"【回滚】交易所回滚交易失败:{e}"
  563. logger.error(msg)
  564. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  565. self._set_state(self.STATE_FAILED)
  566. traceback.print_exc()
  567. def _wait_transfer_arrive(self):
  568. """
  569. 等待资产在交易所内到账
  570. """
  571. msg = f"等待资产在交易所到账..."
  572. logger.info(msg)
  573. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  574. try:
  575. is_arrived = False
  576. # 先進行快速提現判斷,如果不滿足條件就走後面的等待充值模式,雙模,這個步驟最多等待10分鐘
  577. waiting_times = 10
  578. last_deposit_state = None
  579. while waiting_times > 0:
  580. time.sleep(60)
  581. deposit_list = mexc.wallet.get_deposit_list()
  582. # 是否已經在列表中了,抹茶識別充值會稍微有點慢,所以要耐心等
  583. is_list = False
  584. # pending中的數量
  585. pending_amount = Decimal(0)
  586. for deposit in deposit_list:
  587. # 不屬于該路徑需要監聽的代幣
  588. if deposit['coin'] != self.coin:
  589. continue
  590. # 處理pending數量
  591. if Decimal(deposit['confirmTimes']) < Decimal(deposit['unlockConfirm']):
  592. pending_amount = pending_amount + Decimal(deposit['amount'])
  593. # 檢查到沒到列表中
  594. if deposit['transHash'] != self.chain_tx_hash:
  595. continue
  596. last_deposit_state = deposit
  597. is_list = True
  598. # 檢查是否滿足快速提現的條件
  599. if is_list:
  600. # 交易所代幣余额判断
  601. balances = mexc.trade.get_account_info()['balances']
  602. asset_balance = 0
  603. for balance in balances:
  604. if balance['asset'] == self.coin:
  605. asset_balance = Decimal(balance['free'])
  606. # 交易所賣出餘額
  607. exchange_sell_amount = self.exchange_sell_amount
  608. # 最終判斷
  609. if exchange_sell_amount + asset_balance > pending_amount:
  610. last_deposit_state_formated = pformat(last_deposit_state, indent=2)
  611. msg = f"【flash】资产可以進行快速提現。\n{last_deposit_state_formated}"
  612. logger.info(msg)
  613. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  614. self._set_state(self.STATE_TRANSFERRING_TO_CHAIN)
  615. return
  616. else:
  617. logger.info(f"exchange_sell_amount{exchange_sell_amount}, asset_balance{asset_balance}, pending_amount{pending_amount}")
  618. logger.info(f"正在檢查快速提現條件...({waiting_times}/10)")
  619. waiting_times = waiting_times - 1
  620. # 最多等待30分钟
  621. waiting_times = 30
  622. last_deposit_state = None
  623. last_deposit_state_formated = None
  624. while waiting_times > 0:
  625. deposit_list = mexc.wallet.get_deposit_list()
  626. for deposit in deposit_list:
  627. if deposit['transHash'] != self.chain_tx_hash:
  628. continue
  629. last_deposit_state = deposit
  630. last_deposit_state_formated = pformat(last_deposit_state, indent=2)
  631. logger.info(f"等待资产在交易所到账...({deposit['confirmTimes']}/{deposit['unlockConfirm']})")
  632. if Decimal(deposit['confirmTimes']) >= Decimal(deposit['unlockConfirm']):
  633. is_arrived = True
  634. if is_arrived:
  635. msg = f"资产已在交易所到账。\n{last_deposit_state_formated}"
  636. logger.info(msg)
  637. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  638. self._set_state(self.STATE_TRANSFERRING_TO_CHAIN)
  639. return
  640. time.sleep(60)
  641. waiting_times = waiting_times - 1
  642. msg = f"等待充值到账超时(超过30分钟)。\n{last_deposit_state_formated}"
  643. logger.error(msg)
  644. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  645. self._set_state(self.STATE_FAILED)
  646. except Exception as e:
  647. msg = f"查询交易所到账状态时发生错误:{e}"
  648. logger.error(msg)
  649. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  650. self._set_state(self.STATE_FAILED)
  651. traceback.print_exc()
  652. def _execute_transfer_to_chain(self):
  653. """
  654. 将交易后获得的计价资产(例如USDT)转账回链上
  655. """
  656. msg = "执行:交易所计价资产转账回链上..."
  657. logger.info(msg)
  658. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  659. try:
  660. pseudo_withdrawal_amount = str(int(float(self.exchange_withdrawal_amount)))
  661. withdrawal_params = {
  662. 'coin': 'USDT',
  663. 'netWork': 'ETH',
  664. 'address': self.user_wallet,
  665. 'amount': pseudo_withdrawal_amount
  666. }
  667. withdrawal_params_formated = pformat(withdrawal_params, indent=2)
  668. withdrawal_rst = mexc.wallet.post_withdraw(withdrawal_params)
  669. withdrawal_rst_formated = pformat(withdrawal_rst, indent=2)
  670. if "id" not in withdrawal_rst:
  671. msg = f"交易所提现失败\n參數: {withdrawal_params_formated}\n響應: {withdrawal_rst_formated}"
  672. logger.error(msg)
  673. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  674. self._set_state(self.STATE_FAILED)
  675. else:
  676. self.exchange_withdrawal_id = withdrawal_rst["id"]
  677. msg = f"交易所提现已发送\n{withdrawal_rst_formated}"
  678. logger.info(msg)
  679. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  680. self._set_state(self.STATE_WAITING_WITHDRAWAL_CONFIRM)
  681. except Exception as e:
  682. msg = f"转账回链上失败: {e}"
  683. logger.error(msg)
  684. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  685. self._set_state(self.STATE_FAILED)
  686. traceback.print_exc()
  687. def _wait_withdrawal_confirm(self):
  688. """
  689. 等待交易所提现到链上确认
  690. """
  691. exchange_withdrawal_id = self.exchange_withdrawal_id
  692. msg = f"等待交易所提现确认:{exchange_withdrawal_id}"
  693. logger.info(msg)
  694. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  695. try:
  696. is_arrived = False
  697. # 最多等待30分钟
  698. waiting_times = 60
  699. last_deposit_state = None
  700. last_deposit_state_formated = None
  701. while waiting_times > 0:
  702. withdrawal_list = mexc.wallet.get_withdraw_list()
  703. if not isinstance(withdrawal_list, list):
  704. msg = f"查询交易所提现状态时发生错误:{withdrawal_list}"
  705. logger.error(msg)
  706. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  707. self._set_state("FAILED")
  708. return
  709. for withdrawal in withdrawal_list:
  710. if withdrawal['id'] != exchange_withdrawal_id:
  711. continue
  712. last_deposit_state = withdrawal
  713. last_deposit_state_formated = pformat(last_deposit_state, indent=2)
  714. if withdrawal['status'] == 7:
  715. is_arrived = True
  716. if is_arrived:
  717. msg = f"提现请求已上链:\n{last_deposit_state_formated}"
  718. logger.info(msg)
  719. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  720. self._set_state(self.STATE_COMPLETED)
  721. return
  722. time.sleep(30)
  723. waiting_times = waiting_times - 1
  724. msg = f"等待提现到账超时(超过30分钟):\n{last_deposit_state_formated}"
  725. logger.error(msg)
  726. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  727. self._set_state(self.STATE_FAILED)
  728. except Exception as e:
  729. msg = f"查询交易所提现状态时发生错误:{e}"
  730. logger.error(msg)
  731. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  732. self._set_state(self.STATE_FAILED)
  733. traceback.print_exc()
  734. # 伪代码示例:如何使用这个类
  735. if __name__ == "__main__":
  736. import ok_chain_client
  737. import decimal
  738. import pprint
  739. CHAIN_ID = 1
  740. FROM_TOKEN = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
  741. FROM_TOKEN_AMOUNT_HUMAM = Decimal('20')
  742. FROM_TOKEN_DECIMAL = 6
  743. TO_TOKEN = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860'
  744. USER_WALLET = ''
  745. USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
  746. SYMBOL = "RATO_USDT"
  747. # 询价,注意!!!这里直接把交易所地址当收款方,省去transfer的流程
  748. data = ok_chain_client.swap(CHAIN_ID,
  749. FROM_TOKEN_AMOUNT_HUMAM * (10 ** FROM_TOKEN_DECIMAL),
  750. FROM_TOKEN,
  751. TO_TOKEN,
  752. 1,
  753. USER_WALLET,
  754. USER_EXCHANGE_WALLET, # 这里直接把交易所地址当收款方,省去transfer的流程!!!
  755. )
  756. if data.get('code') != '0' or not data.get('data'):
  757. pprint.pprint(data)
  758. pprint.pprint({
  759. "error": f"OK API错误({1}) - Code:{data.get('code', 'N/A')}, Msg:{data.get('msg', data.get('message', 'N/A')) if isinstance(data, dict) else '格式错误'}"})
  760. raise Exception("")
  761. d = data['data'][0]
  762. tx = d['tx']
  763. router_result = d['routerResult']
  764. in_dec, out_dec = int(router_result['fromToken']['decimal']), int(router_result['toToken']['decimal'])
  765. atomic_in_base, atomic_out_target = Decimal(router_result['fromTokenAmount']), Decimal(router_result['toTokenAmount'])
  766. human_in_base = atomic_in_base / (10 ** in_dec)
  767. human_out_target = atomic_out_target / (10 ** out_dec)
  768. FROM_TOKEN_AMOUNT_HUMAM = human_in_base
  769. TO_TOKEN_AMOUNT_HUMAM = human_out_target
  770. pprint.pprint(tx)
  771. # 套利流程执行
  772. process_item = {
  773. "stateFlow": [], # 状态流转记录
  774. }
  775. ap = ArbitrageProcess(tx, 2, 1.2,
  776. FROM_TOKEN, TO_TOKEN,
  777. FROM_TOKEN_AMOUNT_HUMAM, TO_TOKEN_AMOUNT_HUMAM,
  778. USER_EXCHANGE_WALLET, USER_WALLET,
  779. SYMBOL, process_item)
  780. # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
  781. ap._set_state(ap.SELLING_ON_EXCHANGE)
  782. # 在主循环中周期性调用 run_arbitrage_step
  783. while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
  784. ap.run_arbitrage_step()
  785. if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
  786. time.sleep(10)
  787. # else:
  788. # time.sleep(1)
  789. logger.info(process_item)
  790. if ap.current_state == "COMPLETED":
  791. logger.info("套利流程执行成功!")
  792. else:
  793. logger.info("套利流程执行失败!")