arbitrage_process.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. import time
  2. import logging
  3. import datetime
  4. from web3_py_client import EthClient
  5. from mexc_client import MexcClient
  6. from decimal import Decimal, ROUND_DOWN
  7. web3 = EthClient()
  8. mexc = MexcClient()
  9. # 配置日志
  10. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  11. def get_formatted_timestamp():
  12. """
  13. 获取指定格式的时间戳: YYYY-MM-DD HH:MM:SS,ms
  14. 例如: 2025-05-16 14:44:09,324
  15. """
  16. now = datetime.datetime.now()
  17. # 格式化日期和时间部分
  18. timestamp_str = now.strftime("%Y-%m-%d %H:%M:%S")
  19. # 获取毫秒部分,并格式化为3位数字
  20. milliseconds = now.microsecond // 1000
  21. milliseconds_str = f"{milliseconds:03d}"
  22. # 组合最终格式
  23. return f"{timestamp_str},{milliseconds_str}"
  24. def add_state_flow_entry(process_item, state_name, msg, status_val="pending"):
  25. """辅助函数,用于向 stateFlow 列表添加条目。"""
  26. entry = {
  27. "stateName": state_name, # 状态名称
  28. "timestamp": get_formatted_timestamp(), # 时间戳
  29. "msg": msg, # 消息
  30. "status": status_val # 状态值: "pending", "success", "fail", "skipped"
  31. }
  32. process_item["stateFlow"].append(entry)
  33. process_item["currentState"] = state_name # 更新整体状态
  34. # logging.info(f"[流程 {process_item.get('id', 'N/A')}][{state_name}]: {msg} (状态: {status_val})")
  35. class ArbitrageProcess:
  36. def __init__(self, tx, gas_limit_multiplier, gas_price_multiplier,
  37. from_token, to_token,
  38. from_token_amount_human, exchange_out_amount,
  39. user_exchange_wallet, user_wallet,
  40. symbol, process_item):
  41. """
  42. 初始化套利流程
  43. Args:
  44. tx: 在链上要发送交易的tx
  45. gas_limit_multiplier: gas limit倍数, 一般都不加倍
  46. gas_price_multiplier: gas price倍数, 可以提高交易成功率
  47. """
  48. tx.pop('gasPrice', None)
  49. tx.pop('value', None)
  50. tx.pop('minReceiveAmount', None)
  51. tx.pop('slippage', None)
  52. tx.pop('maxSpendAmount', None)
  53. tx.pop('signatureData', None)
  54. self.tx = tx
  55. self.gas_limit_multiplier = gas_limit_multiplier
  56. self.gas_price_multiplier = gas_price_multiplier
  57. self.from_token_addr = from_token
  58. self.to_token_addr = to_token
  59. self.user_exchange_wallet = user_exchange_wallet
  60. self.user_wallet = user_wallet
  61. self.symbol = symbol
  62. self.coin = symbol.split('_')[0]
  63. self.base_coin = symbol.split('_')[1]
  64. self.process_item = process_item
  65. self.sell_price = Decimal(0)
  66. self.buy_price = Decimal(0)
  67. # 存储当前套利交易的细节信息,例如买入数量、价格等
  68. self.arbitrage_details = {
  69. "chain_buy_tx_hash": None, # 链上买入的tx hash
  70. "chain_usdt_use": Decimal(from_token_amount_human), # 链上usdt减少量(使用量), todo, 暂用固定值代替
  71. "chain_amount_before_trade": 0,
  72. "chain_amount_after_trade": 0,
  73. "chain_buy_amount": Decimal('0'), # 链上币增加量(购入量), todo, 暂用即时余额代替
  74. "chain_buy_price": None, # 链上购入价, todo
  75. "chain_withdrawal_tx_hash": None, # 链上转入交易所的tx
  76. "exchange_out_amount": Decimal(exchange_out_amount), # 交易所卖出量
  77. "exchange_sell_order_id": None, # 交易所卖出id
  78. "exchange_withdraw_id": None, # 交易所提现id
  79. "exchange_withdraw_amount": None, # 交易所提现数量
  80. }
  81. # 定义可能的状态
  82. self.STATES = [
  83. "CHECK", # 检查余额、估算gas等
  84. "SELLING_ON_EXCHANGE", # 正在中心化交易所卖出现货
  85. "WAITING_SELL_CONFIRM", # 等待现货卖出订单确认
  86. "BUYING_ON_CHAIN", # 正在链上买入
  87. "WAITING_CHAIN_CONFIRM", # 等待链上交易确认
  88. "WAITING_EXCHANGE_ROLLBACK", # 等待交易所回滚
  89. # "HEDGING_ON_EXCHANGE", # 正在中心化交易所套保
  90. # "WAITING_HEDGE_CONFIRM", # 等待套保订单确认
  91. # "TRANSFERRING_TO_EXCHANGE", # 正在向交易所转账
  92. # "CLOSING_HEDGE", # 正在平掉套保单
  93. # "WAITING_CLOSE_HEDGE_CONFIRM", # 等待平掉套保单确认
  94. "WAITING_TRANSFER_ARRIVE", # 等待交易所充值到账
  95. "TRANSFERRING_TO_CHAIN", # 正在向链上转账
  96. "WAITING_WITHDRAWAL_CONFIRM", # 等待链上提现确认
  97. "COMPLETED", # 套利流程完成
  98. "REJECT", # 套利被程序拒绝
  99. "FAILED" # 套利流程失败
  100. ]
  101. self.STATE_IDLE = "IDLE"
  102. self.STATE_CHECK = "CHECK"
  103. self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE"
  104. self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM"
  105. self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN"
  106. self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
  107. self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK"
  108. # self.STATE_TRANSFERRING_TO_EXCHANGE = "TRANSFERRING_TO_EXCHANGE"
  109. self.STATE_WAITING_TRANSFER_ARRIVE = "WAITING_TRANSFER_ARRIVE"
  110. self.STATE_TRANSFERRING_TO_CHAIN = "TRANSFERRING_TO_CHAIN"
  111. self.STATE_WAITING_WITHDRAWAL_CONFIRM = "WAITING_WITHDRAWAL_CONFIRM"
  112. self.STATE_COMPLETED = "COMPLETED"
  113. self.STATE_REJECT = "REJECT"
  114. self.STATE_FAILED = "FAILED"
  115. self.current_state = "IDLE"
  116. def _set_state(self, state):
  117. """
  118. 设置系统状态,并打印日志
  119. """
  120. if state in self.STATES:
  121. logging.info(f"状态变更:{self.current_state} -> {state}")
  122. logging.info('')
  123. self.current_state = state
  124. else:
  125. logging.error(f"尝试设置无效状态:{state}")
  126. def run_arbitrage_step(self):
  127. """
  128. 根据当前状态执行套利流程的下一步
  129. 这是一个周期性调用的函数,例如在主循环中调用
  130. """
  131. if self.current_state == self.STATE_CHECK:
  132. self._execute_check()
  133. elif self.current_state == self.STATE_SELLING_ON_EXCHANGE:
  134. self._execute_sell_on_exchange()
  135. elif self.current_state == self.STATE_WAITING_SELL_CONFIRM:
  136. self._wait_sell_confirm()
  137. elif self.current_state == self.STATE_BUYING_ON_CHAIN:
  138. self._execute_buy_on_chain()
  139. elif self.current_state == self.STATE_WAITING_CHAIN_CONFIRM:
  140. self._wait_chain_confirm()
  141. elif self.current_state == self.STATE_WAITING_EXCHANGE_ROLLBACK:
  142. self._wait_exchange_rollback()
  143. # elif self.current_state == "TRANSFERRING_TO_EXCHANGE":
  144. # self._execute_transfer_to_exchange()
  145. elif self.current_state == self.STATE_WAITING_TRANSFER_ARRIVE:
  146. self._wait_transfer_arrive()
  147. elif self.current_state == self.STATE_TRANSFERRING_TO_CHAIN:
  148. self._execute_transfer_to_chain()
  149. elif self.current_state == self.STATE_WAITING_WITHDRAWAL_CONFIRM:
  150. self._wait_withdrawal_confirm()
  151. elif self.current_state == self.STATE_COMPLETED:
  152. msg = "套利流程成功完成!"
  153. logging.info(msg)
  154. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  155. elif self.current_state == self.STATE_REJECT:
  156. msg = "套利流程被程序拒绝"
  157. logging.error(msg)
  158. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  159. elif self.current_state == self.STATE_FAILED:
  160. msg = "套利流程失败!"
  161. logging.error(msg)
  162. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  163. def _execute_check(self):
  164. """
  165. 前置检查,防止低能错误
  166. """
  167. try:
  168. # step1,檢查交易所的餘額是否夠用
  169. pseudo_amount_to_sell = self.arbitrage_details["exchange_out_amount"]
  170. # 处理精度
  171. pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
  172. # 交易所套保余额判断
  173. balances = mexc.trade.get_account_info()['balances']
  174. for balance in balances:
  175. if balance['asset'] == self.coin:
  176. if Decimal(balance['free']) < pseudo_amount_to_sell:
  177. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 不能触发套保交易。"
  178. logging.info(msg)
  179. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  180. self._set_state(self.STATE_REJECT)
  181. return
  182. else:
  183. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 余额校验通过(可以套保)。"
  184. logging.info(msg)
  185. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  186. break
  187. # step2,估算gas
  188. latest_block = web3.w3.eth.get_block('latest')
  189. self.tx['maxPriorityFeePerGas'] = int(int(self.tx['maxPriorityFeePerGas']) * self.gas_price_multiplier)
  190. self.tx['maxFeePerGas'] = int(int(latest_block['baseFeePerGas']) * 2 + self.tx['maxPriorityFeePerGas'])
  191. estimated_gas_origin = web3.w3.eth.estimate_gas(self.tx)
  192. estimated_gas = int(estimated_gas_origin * self.gas_limit_multiplier)
  193. estimated_wei = estimated_gas * (self.tx['maxPriorityFeePerGas'] + self.tx['maxFeePerGas'])
  194. estimated_eth = estimated_wei / (10 ** 18)
  195. msg = f"估算的燃气量: {estimated_gas}, eth消耗: {estimated_eth},gas估算通過"
  196. logging.info(msg)
  197. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  198. # step3, 費用與利潤比較
  199. LIMIT = 0.005
  200. if estimated_eth > LIMIT:
  201. msg = f"TODO: 費用判斷不通過,費用還是固定值,建議改成與ETH價格挂鈎的值,方便設定利潤!LIMIT:{LIMIT}, estimated_eth: {estimated_eth}"
  202. logging.info(msg)
  203. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  204. self._set_state(self.STATE_REJECT)
  205. return
  206. msg = f"TODO: 費用判斷通過,費用還是固定值,建議改成與ETH價格挂鈎的值,方便設定利潤!LIMIT:{LIMIT}, estimated_eth: {estimated_eth}"
  207. logging.info(msg)
  208. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  209. # step4, 與賬戶eth餘額比對(至少留0.001,不然沒gas了)
  210. MARGIN = 0.001
  211. eth_balance_origin = web3.w3.eth.get_balance(self.user_wallet)
  212. eth_balance = eth_balance_origin / (10 ** 18)
  213. if eth_balance - estimated_eth < MARGIN:
  214. msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  215. logging.info(msg)
  216. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  217. self._set_state(self.STATE_REJECT)
  218. return
  219. msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  220. logging.info(msg)
  221. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  222. # final, 設定交易狀態,開始交易
  223. self._set_state(self.STATE_SELLING_ON_EXCHANGE)
  224. except Exception as e:
  225. msg = f"前置檢查未通過:{e}"
  226. logging.error(msg)
  227. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  228. self._set_state(self.STATE_REJECT)
  229. # 以下是每个状态对应的具体执行函数
  230. def _execute_sell_on_exchange(self):
  231. """
  232. 在中心化交易所卖出现货
  233. """
  234. msg = "执行:中心化交易所卖出现货..."
  235. logging.info(msg)
  236. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  237. try:
  238. # 第一步直接卖出,这个数量用固定数量
  239. pseudo_amount_to_sell = self.arbitrage_details["exchange_out_amount"]
  240. # 处理精度
  241. pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
  242. # # 链上余额判断
  243. # from_token_balance = web3.get_erc20_balance(self.from_token_addr, self.user_wallet)
  244. # if from_token_balance < self.arbitrage_details["chain_usdt_use"]:
  245. # msg = f"链上剩余{self.base_coin}: {from_token_balance}, 需要使用:{self.arbitrage_details["chain_usdt_use"]}, 余额不足,不能触发交易。"
  246. # logging.info(msg)
  247. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  248. # self._set_state(self.STATE_FAILED)
  249. # return
  250. # msg = f"链上剩余{self.base_coin}: {from_token_balance}, 需要使用:{self.arbitrage_details["chain_usdt_use"]}, 余额充足。"
  251. # logging.info(msg)
  252. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  253. order_params = {
  254. "symbol": self.symbol.replace('_', ''),
  255. "side": "SELL",
  256. "type": "MARKET",
  257. "quantity": int(pseudo_amount_to_sell),
  258. }
  259. logging.info(order_params)
  260. exchange_sell_order = mexc.trade.post_order(order_params)
  261. if 'orderId' not in exchange_sell_order:
  262. msg = f"交易所现货卖出下单失败:{exchange_sell_order}"
  263. logging.error(msg)
  264. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  265. self._set_state(self.STATE_FAILED)
  266. return
  267. exchange_sell_order_id = exchange_sell_order['orderId']
  268. msg = f"交易所现货卖出订单已发送, 订单ID: {exchange_sell_order_id}"
  269. logging.info(msg)
  270. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  271. self.arbitrage_details["exchange_sell_order_id"] = exchange_sell_order_id
  272. self._set_state(self.STATE_WAITING_SELL_CONFIRM)
  273. except Exception as e:
  274. msg = f"交易所现货卖出下单失败:{e}"
  275. logging.error(msg)
  276. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  277. self._set_state(self.STATE_FAILED)
  278. def _wait_sell_confirm(self):
  279. """
  280. 等待交易所现货卖出订单确认(完全成交)
  281. """
  282. exchange_sell_order_id = self.arbitrage_details["exchange_sell_order_id"]
  283. msg = f"等待交易所现货卖出订单确认:{exchange_sell_order_id}"
  284. logging.info(msg)
  285. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  286. last_order = None
  287. try:
  288. # 查询交易所订单状态
  289. waiting_times = 30
  290. while waiting_times > 0:
  291. params = {
  292. "symbol": self.symbol.replace('_', ''),
  293. "orderId": exchange_sell_order_id
  294. }
  295. order = mexc.trade.get_order(params)
  296. last_order = order
  297. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  298. money = Decimal(order['cummulativeQuoteQty'])
  299. amount = self.arbitrage_details["exchange_out_amount"]
  300. self.sell_price = money / amount
  301. self.sell_price = self.sell_price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  302. msg = f"交易所现货卖出订单已完成, 价格:{self.sell_price}。{order}"
  303. logging.info(msg)
  304. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  305. self.arbitrage_details["exchange_withdraw_amount"] = order['cummulativeQuoteQty']
  306. self._set_state(self.STATE_BUYING_ON_CHAIN)
  307. return
  308. else:
  309. time.sleep(1)
  310. waiting_times = waiting_times - 1
  311. msg = f"交易所现货卖出订单失敗, 最後狀態:{last_order}。"
  312. logging.info(msg)
  313. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  314. self._set_state(self.FAILED)
  315. except Exception as e:
  316. msg = f"查询交易所现货卖出订单状态时发生错误:{e}"
  317. logging.error(msg)
  318. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  319. self._set_state("FAILED")
  320. def _execute_buy_on_chain(self):
  321. """
  322. 在链上执行买入操作
  323. """
  324. msg = "执行:链上买入操作..."
  325. logging.info(msg)
  326. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  327. try:
  328. self.arbitrage_details["chain_amount_before_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
  329. # 调用链上客户端执行买入交易
  330. chain_buy_tx_hash = web3._sign_and_send_transaction(
  331. self.tx,
  332. self.gas_limit_multiplier
  333. )
  334. # 交易成功
  335. msg = f"链上买入交易已发送,交易哈希:{chain_buy_tx_hash}"
  336. logging.info(msg)
  337. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  338. self.arbitrage_details["chain_buy_tx_hash"] = chain_buy_tx_hash
  339. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  340. except Exception as e:
  341. msg = f"链上买入失败:{e}"
  342. logging.error(msg)
  343. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  344. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  345. def _wait_chain_confirm(self):
  346. """
  347. 等待链上交易确认
  348. """
  349. hash = self.arbitrage_details["chain_buy_tx_hash"]
  350. msg = f"等待链上交易确认:{hash}"
  351. logging.info(msg)
  352. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  353. try:
  354. # 查询链上交易确认状态
  355. receipt = web3.wait_for_transaction_receipt(hash, timeout=300)
  356. if receipt.status == 1:
  357. # 在这里根据实际链上交易结果更新实际买入数量,用于后续流程
  358. # 这里要用确认后数量减去确认前数量,才知道具体买入了多少
  359. # TODO 卡這裏了 這個方案不太好啊,要不直接獲取交易所充值信息得了
  360. actual_buy_amount = Decimal(0)
  361. while True:
  362. self.arbitrage_details["chain_amount_after_trade"] = web3.get_erc20_balance(self.to_token_addr, self.user_exchange_wallet)
  363. actual_buy_amount = self.arbitrage_details["chain_amount_after_trade"] - self.arbitrage_details["chain_amount_before_trade"]
  364. if actual_buy_amount > Decimal(0):
  365. break
  366. time.sleep(1)
  367. buy_amount_human = actual_buy_amount.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  368. sell_amount_human = self.arbitrage_details["chain_usdt_use"]
  369. self.arbitrage_details["chain_buy_amount"] = buy_amount_human # 存储实际买入数量
  370. self.buy_price = sell_amount_human / buy_amount_human
  371. self.buy_price = self.buy_price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  372. rate = self.sell_price / self.buy_price
  373. rate = rate.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  374. msg = f"【比率{rate}】。链上交易已确认。用{sell_amount_human}买入{buy_amount_human},价格{self.buy_price}。"
  375. logging.info(msg)
  376. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  377. self._set_state(self.STATE_WAITING_TRANSFER_ARRIVE)
  378. else:
  379. msg = f"链上交易确认失败:{hash}"
  380. logging.error(msg)
  381. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  382. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  383. except Exception as e:
  384. msg = f"查询链上确认状态时发生错误:{e}"
  385. logging.error(msg)
  386. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  387. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  388. def _wait_exchange_rollback(self):
  389. """
  390. 市价进行交易所交易回滚
  391. """
  392. msg = "执行:中心化交易所买入现货回滚..."
  393. logging.info(msg)
  394. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  395. try:
  396. # 使用预提现数量进行回滚
  397. pseudo_amount_to_buy = Decimal(self.arbitrage_details["exchange_withdraw_amount"])
  398. # 处理精度
  399. pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(Decimal('1'), rounding=ROUND_DOWN)
  400. # 交易所U余额判断
  401. balances = mexc.trade.get_account_info()['balances']
  402. for balance in balances:
  403. if balance['asset'] == self.base_coin:
  404. pseudo_amount_to_buy = min(Decimal(balance['free']), pseudo_amount_to_buy)
  405. if pseudo_amount_to_buy < Decimal('10'):
  406. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 小于10, 不能触发回滚交易。"
  407. logging.info(msg)
  408. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  409. self._set_state(self.STATE_FAILED)
  410. return
  411. else:
  412. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 交易所准备使用:{pseudo_amount_to_buy}, 余额校验通过(可以回滚)。"
  413. logging.info(msg)
  414. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  415. break
  416. order_params = {
  417. "symbol": self.symbol.replace('_', ''),
  418. "side": "BUY",
  419. "type": "MARKET",
  420. "quoteOrderQty": int(pseudo_amount_to_buy),
  421. }
  422. logging.info(order_params)
  423. exchange_buy_order = mexc.trade.post_order(order_params)
  424. if 'orderId' not in exchange_buy_order:
  425. msg = f"【回滚】交易所现货买入下单失败:{exchange_buy_order}"
  426. logging.error(msg)
  427. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  428. self._set_state("FAILED")
  429. return
  430. exchange_buy_order_id = exchange_buy_order['orderId']
  431. msg = f"【回滚】交易所现货买入订单已发送, 订单ID: {exchange_buy_order_id}"
  432. logging.info(msg)
  433. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  434. # 查询交易所订单状态
  435. waiting_times = 30
  436. last_query_rst = None
  437. while True:
  438. params = {
  439. "symbol": self.symbol.replace('_', ''),
  440. "orderId": exchange_buy_order_id
  441. }
  442. order = mexc.trade.get_order(params)
  443. last_query_rst = order
  444. if order['status'] == "FILLED":
  445. money = Decimal(order['cummulativeQuoteQty'])
  446. amount = self.arbitrage_details["exchange_out_amount"]
  447. price = money / amount
  448. price = price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  449. msg = f"【回滚】交易所现货买入订单已完全成交, 价格:{price}。{order}"
  450. logging.info(msg)
  451. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  452. self._set_state(self.STATE_FAILED)
  453. return
  454. else:
  455. # 继续等待成交
  456. pass
  457. time.sleep(1)
  458. waiting_times = waiting_times - 1
  459. msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_buy_order_id},最终状态:{last_query_rst}"
  460. logging.info(msg)
  461. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  462. self._set_state(self.STATE_FAILED)
  463. except Exception as e:
  464. msg = f"【回滚】交易所回滚交易失败:{e}"
  465. logging.error(msg)
  466. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  467. self._set_state("FAILED")
  468. def _wait_transfer_arrive(self):
  469. """
  470. 等待资产在交易所内到账
  471. """
  472. msg = f"等待资产在交易所到账..."
  473. logging.info(msg)
  474. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  475. try:
  476. is_arrived = False
  477. # 先進行快速提現判斷,如果不滿足條件就走後面的等待充值模式,雙模,這個步驟最多等待10分鐘
  478. waiting_times = 10
  479. last_deposit_state = None
  480. while waiting_times > 0:
  481. time.sleep(60)
  482. deposit_list = mexc.wallet.get_deposit_list()
  483. # 是否已經在列表中了,抹茶識別充值會稍微有點慢,所以要耐心等
  484. is_list = False
  485. # pending中的數量
  486. pending_amount = Decimal(0)
  487. for deposit in deposit_list:
  488. # 不屬于該路徑需要監聽的代幣
  489. if deposit['coin'] != self.coin:
  490. continue
  491. # 處理pending數量
  492. if Decimal(deposit['confirmTimes']) < Decimal(deposit['unlockConfirm']):
  493. pending_amount = pending_amount + Decimal(deposit['amount'])
  494. # 檢查到沒到列表中
  495. if deposit['transHash'] != self.arbitrage_details['chain_buy_tx_hash']:
  496. continue
  497. last_deposit_state = deposit
  498. is_list = True
  499. # 檢查是否滿足快速提現的條件
  500. if is_list:
  501. # 交易所代幣余额判断
  502. balances = mexc.trade.get_account_info()['balances']
  503. asset_balance = 0
  504. for balance in balances:
  505. if balance['asset'] == self.coin:
  506. asset_balance = Decimal(balance['free'])
  507. # 最終判斷
  508. if asset_balance > pending_amount:
  509. msg = f"【flash】资产可以進行快速提現。{last_deposit_state}"
  510. logging.info(msg)
  511. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  512. self._set_state(self.STATE_TRANSFERRING_TO_CHAIN)
  513. return
  514. logging.info(f"正在檢查快速提現條件...({waiting_times}/10)")
  515. waiting_times = waiting_times - 1
  516. # 最多等待30分钟
  517. waiting_times = 30
  518. last_deposit_state = None
  519. while waiting_times > 0:
  520. deposit_list = mexc.wallet.get_deposit_list()
  521. for deposit in deposit_list:
  522. if deposit['transHash'] != self.arbitrage_details['chain_buy_tx_hash']:
  523. continue
  524. last_deposit_state = deposit
  525. logging.info(f"等待资产在交易所到账...({deposit['confirmTimes']}/{deposit['unlockConfirm']})")
  526. if Decimal(deposit['confirmTimes']) >= Decimal(deposit['unlockConfirm']):
  527. is_arrived = True
  528. if is_arrived:
  529. msg = f"资产已在交易所到账。{last_deposit_state}"
  530. logging.info(msg)
  531. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  532. self._set_state(self.STATE_TRANSFERRING_TO_CHAIN)
  533. return
  534. time.sleep(60)
  535. waiting_times = waiting_times - 1
  536. msg = f"等待充值到账超时(超过30分钟): {last_deposit_state}"
  537. logging.error(msg)
  538. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  539. self._set_state("FAILED")
  540. except Exception as e:
  541. msg = f"查询交易所到账状态时发生错误:{e}"
  542. logging.error(msg)
  543. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  544. self._set_state("FAILED")
  545. def _execute_transfer_to_chain(self):
  546. """
  547. 将交易后获得的计价资产(例如USDT)转账回链上
  548. """
  549. msg = "执行:交易所计价资产转账回链上..."
  550. logging.info(msg)
  551. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  552. try:
  553. # times = 10
  554. # while times > 0:
  555. # balances = mexc.trade.get_account_info()['balances']
  556. # for balance in balances:
  557. # if balance['asset'] == 'USDT':
  558. pseudo_withdraw_amount = str(int(float(self.arbitrage_details["exchange_withdraw_amount"])))
  559. withdraw_params = {
  560. 'coin': 'USDT',
  561. 'netWork': 'ETH',
  562. 'address': self.user_wallet,
  563. 'amount': pseudo_withdraw_amount
  564. }
  565. withdraw_rst = mexc.wallet.post_withdraw(withdraw_params)
  566. if "id" not in withdraw_rst:
  567. logging.error(f"提现失败")
  568. logging.error(withdraw_params)
  569. logging.error(withdraw_rst)
  570. exchange_withdrawal_id = withdraw_rst["id"]
  571. msg = f"交易所提现已发送, 提现ID: {exchange_withdrawal_id}"
  572. logging.info(msg)
  573. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  574. self.arbitrage_details["exchange_withdrawl_id"] = withdraw_rst["id"]
  575. self._set_state(self.STATE_WAITING_WITHDRAWAL_CONFIRM)
  576. except Exception as e:
  577. msg = f"转账回链上失败:{e}"
  578. logging.error(msg)
  579. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  580. self._set_state("FAILED")
  581. def _wait_withdrawal_confirm(self):
  582. """
  583. 等待交易所提现到链上确认
  584. """
  585. exchange_withdrawl_id = self.arbitrage_details['exchange_withdrawl_id']
  586. msg = f"等待交易所提现确认:{exchange_withdrawl_id}"
  587. logging.info(msg)
  588. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  589. try:
  590. is_arrived = False
  591. # 最多等待30分钟
  592. waiting_times = 60
  593. last_deposit_state = None
  594. while waiting_times > 0:
  595. withdraw_list = mexc.wallet.get_withdraw_list()
  596. if not isinstance(withdraw_list, list):
  597. msg = f"查询交易所提现状态时发生错误:{withdraw_list}"
  598. logging.error(msg)
  599. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  600. self._set_state("FAILED")
  601. return
  602. for withdraw in withdraw_list:
  603. if withdraw['id'] != exchange_withdrawl_id:
  604. continue
  605. last_deposit_state = withdraw
  606. if withdraw['status'] == 7:
  607. is_arrived = True
  608. if is_arrived:
  609. msg = f"提现请求已上链: {last_deposit_state}"
  610. logging.info(msg)
  611. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  612. self._set_state(self.STATE_COMPLETED)
  613. return
  614. time.sleep(30)
  615. waiting_times = waiting_times - 1
  616. msg = f"等待提现到账超时(超过30分钟): {last_deposit_state}"
  617. logging.error(msg)
  618. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  619. self._set_state("FAILED")
  620. except Exception as e:
  621. msg = f"查询交易所提现状态时发生错误:{e}"
  622. logging.error(msg)
  623. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  624. self._set_state("FAILED")
  625. # 伪代码示例:如何使用这个类
  626. if __name__ == "__main__":
  627. import ok_chain_client
  628. import decimal
  629. import pprint
  630. CHAIN_ID = 1
  631. FROM_TOKEN = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
  632. FROM_TOKEN_AMOUNT_HUMAM = Decimal('20')
  633. FROM_TOKEN_DECIMAL = 6
  634. TO_TOKEN = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860'
  635. USER_WALLET = '0xb1f33026Db86a86372493a3B124d7123e9045Bb4'
  636. USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
  637. SYMBOL = "RATO_USDT"
  638. # 询价,注意!!!这里直接把交易所地址当收款方,省去transfer的流程
  639. data = ok_chain_client.swap(CHAIN_ID,
  640. FROM_TOKEN_AMOUNT_HUMAM * (10 ** FROM_TOKEN_DECIMAL),
  641. FROM_TOKEN,
  642. TO_TOKEN,
  643. 1,
  644. USER_WALLET,
  645. USER_EXCHANGE_WALLET, # 这里直接把交易所地址当收款方,省去transfer的流程!!!
  646. )
  647. if data.get('code') != '0' or not data.get('data'):
  648. pprint.pprint(data)
  649. pprint.pprint({
  650. "error": f"OK API错误({1}) - Code:{data.get('code', 'N/A')}, Msg:{data.get('msg', data.get('message', 'N/A')) if isinstance(data, dict) else '格式错误'}"})
  651. raise Exception("")
  652. d = data['data'][0]
  653. tx = d['tx']
  654. router_result = d['routerResult']
  655. in_dec, out_dec = int(router_result['fromToken']['decimal']), int(router_result['toToken']['decimal'])
  656. atomic_in_base, atomic_out_target = Decimal(router_result['fromTokenAmount']), Decimal(router_result['toTokenAmount'])
  657. human_in_base = atomic_in_base / (10 ** in_dec)
  658. human_out_target = atomic_out_target / (10 ** out_dec)
  659. FROM_TOKEN_AMOUNT_HUMAM = human_in_base
  660. TO_TOKEN_AMOUNT_HUMAM = human_out_target
  661. pprint.pprint(tx)
  662. # 套利流程执行
  663. process_item = {
  664. "stateFlow": [], # 状态流转记录
  665. }
  666. ap = ArbitrageProcess(tx, 2, 1.2,
  667. FROM_TOKEN, TO_TOKEN,
  668. FROM_TOKEN_AMOUNT_HUMAM, TO_TOKEN_AMOUNT_HUMAM,
  669. USER_EXCHANGE_WALLET, USER_WALLET,
  670. SYMBOL, process_item)
  671. # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
  672. ap._set_state(ap.SELLING_ON_EXCHANGE)
  673. # 在主循环中周期性调用 run_arbitrage_step
  674. while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
  675. ap.run_arbitrage_step()
  676. if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
  677. time.sleep(10)
  678. # else:
  679. # time.sleep(1)
  680. logging.info(process_item)
  681. if ap.current_state == "COMPLETED":
  682. logging.info("套利流程执行成功!")
  683. else:
  684. logging.info("套利流程执行失败!")