s_mexc_to_erc20.py 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. import time
  2. import traceback
  3. import copy
  4. import os
  5. from web3_py_client import EthClient
  6. from mexc_client import MexcClient
  7. from decimal import Decimal, ROUND_DOWN
  8. from as_utils import add_state_flow_entry
  9. from checker.logger_config import get_logger
  10. from pprint import pformat
  11. web3 = EthClient()
  12. web3_backup = EthClient(os.getenv("RPC_URL_2"))
  13. mexc = MexcClient()
  14. # 配置日志
  15. logger = get_logger('as')
  16. class ArbitrageProcess:
  17. def __init__(self, gas_limit_multiplier, gas_price_multiplier, process_item,
  18. core_data, core_lock,
  19. pending_data, pending_lock,
  20. mexc_data, mexc_lock,
  21. ):
  22. """
  23. 初始化套利流程
  24. Args:
  25. gas_limit_multiplier: gas limit倍数, 一般都不加倍
  26. gas_price_multiplier: gas price倍数, 可以提高交易成功率
  27. process_item: 信號發送端傳入的原始參數
  28. """
  29. self.NETWORK = 'ETH'
  30. tx = process_item['tx']
  31. tx.pop('gasPrice', None)
  32. tx.pop('value', None)
  33. tx.pop('minReceiveAmount', None)
  34. tx.pop('slippage', None)
  35. tx.pop('maxSpendAmount', None)
  36. tx.pop('signatureData', None)
  37. self.core_data = core_data
  38. self.core_lock = core_lock
  39. self.pending_data = pending_data
  40. self.pending_lock = pending_lock
  41. self.mexc_data = mexc_data
  42. self.mexc_lock = mexc_lock
  43. # symbol轉大寫
  44. self.symbol = process_item['symbol'].upper()
  45. self.coin = self.symbol.split('_')[0]
  46. self.base_coin = self.symbol.split('_')[1]
  47. with self.core_lock:
  48. self.eth_price = self.core_data['eth_price']
  49. with self.mexc_lock:
  50. if self.NETWORK in self.mexc_data['coin_info_map'][self.coin]:
  51. self.withdraw_info = copy.deepcopy(self.mexc_data['coin_info_map'][self.coin][self.NETWORK])
  52. self.WITHDRAW_FEE = Decimal(self.withdraw_info['withdrawFee']) # 提現手續費
  53. self.WITHDRAW_ENABLE = self.withdraw_info['withdrawEnable'] # 是否启用提现
  54. withdraw_info_formated = pformat(self.withdraw_info, indent=2)
  55. logger.info(f'提現信息識別, 手續費:{self.WITHDRAW_FEE}\n{withdraw_info_formated}')
  56. else:
  57. self.WITHDRAW_FEE = Decimal(0) # 提現手續費
  58. self.WITHDRAW_ENABLE = True # 是否启用提现
  59. logger.info(f'提現信息未找到, 猜测是跨链式套利,继续执行。')
  60. self.tx = tx
  61. self.mexc_price = Decimal(process_item['cexPrice'])
  62. self.profit = Decimal(process_item['profit']) # 這個利潤是實際到手利潤
  63. self.profit_limit = Decimal(process_item['profitLimit']) # 這個利潤是實際到手利潤的limit
  64. self.gas_limit_multiplier = gas_limit_multiplier
  65. self.gas_price_multiplier = gas_price_multiplier
  66. self.from_token_addr = process_item['fromToken']
  67. self.from_token_decimal = Decimal(process_item['fromTokenDecimal'])
  68. self.to_token_addr = process_item['toToken']
  69. self.to_token_decimal = Decimal(process_item['toTokenDecimal'])
  70. self.user_exchange_wallet = process_item['userExchangeWallet']
  71. self.user_wallet = process_item['userWallet']
  72. self.process_item = process_item
  73. # 存储当前套利交易的细节信息,例如买入数量、价格等
  74. self.sell_price = Decimal(0)
  75. self.sell_value = Decimal(0)
  76. self.buy_price = Decimal(0)
  77. self.buy_value = Decimal(0)
  78. self.chain_tx_hash = None # 链上卖出的tx hash
  79. self.exchange_buy_amount = Decimal(process_item['fromTokenAmountHuman']) # 交易所买入量
  80. self.exchange_buy_amount = self.exchange_buy_amount + self.WITHDRAW_FEE # 买入量考虑提现手续费
  81. self.already_bought_amount = Decimal(0)
  82. self.exchange_withdrawal_id = None # 交易所提现id
  83. self.exchange_withdrawal_amount = None # 交易所提现数量
  84. self.actual_profit = Decimal(0) # 實際利潤
  85. # 定义可能的状态
  86. self.STATES = [
  87. "CHECK", # 检查余额、估算gas等
  88. "BUYING_ON_EXCHANGE", # 正在中心化交易所买入现货
  89. "WAITING_BUY_CONFIRM", # 等待现货买入订单确认
  90. "SELLING_ON_CHAIN", # 正在链上卖出
  91. "WAITING_CHAIN_CONFIRM", # 等待链上交易确认
  92. "WAITING_EXCHANGE_ROLLBACK", # 等待交易所回滚
  93. "WAITING_TRANSFER_ARRIVE", # 等待交易所到账
  94. "TRANSFERRING_TO_CHAIN", # 正在向链上转账
  95. "WAITING_WITHDRAWAL_CONFIRM", # 等待链上提现确认
  96. "COMPLETED", # 套利流程完成
  97. "REJECT", # 套利被程序拒绝
  98. "FAILED" # 套利流程失败
  99. ]
  100. self.STATE_IDLE = "IDLE"
  101. self.STATE_CHECK = "CHECK"
  102. self.STATE_BUYING_ON_EXCHANGE = "BUYING_ON_EXCHANGE"
  103. # self.STATE_WAITING_BUY_CONFIRM = "WAITING_BUY_CONFIRM"
  104. self.STATE_SELLING_ON_CHAIN = "SELLING_ON_CHAIN"
  105. self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
  106. self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK"
  107. self.STATE_WAITING_TRANSFER_ARRIVE = "WAITING_TRANSFER_ARRIVE"
  108. self.STATE_TRANSFERRING_TO_CHAIN = "TRANSFERRING_TO_CHAIN"
  109. self.STATE_WAITING_WITHDRAWAL_CONFIRM = "WAITING_WITHDRAWAL_CONFIRM"
  110. self.STATE_COMPLETED = "COMPLETED"
  111. self.STATE_REJECT = "REJECT"
  112. self.STATE_FAILED = "FAILED"
  113. # 所有前置信息获取都没有问题的话就等待开机信号
  114. self.current_state = self.STATE_IDLE
  115. exchange_info_params = {
  116. "symbols": self.symbol.replace('_', '')
  117. }
  118. exchange_info_rst = mexc.market.get_exchangeInfo(exchange_info_params)
  119. # 返回值检查
  120. if 'symbols' not in exchange_info_rst or len(exchange_info_rst['symbols']) != 1:
  121. params_formated = pformat(exchange_info_params, indent=2)
  122. info_formated = pformat(exchange_info_rst, indent=2)
  123. msg = f'获取交易规则时出现错误\n{exchange_info_params}\n{info_formated}'
  124. logger.error(msg)
  125. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  126. self.current_state = self.STATE_FAILED
  127. return
  128. # 返回的交易对信息核对]
  129. exchange_info = exchange_info_rst['symbols'][0]
  130. if exchange_info['symbol'].upper() != self.symbol.replace('_', ''):
  131. info_formated = pformat(exchange_info, indent=2)
  132. msg = f'获取到的交易规则与交易币对无关\n{info_formated}'
  133. logger.error(msg)
  134. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  135. self.current_state = self.STATE_FAILED
  136. return
  137. # 精度取得, 假如是RATOUSDT这个交易对的话:
  138. self.coin_asset_precision = Decimal(f'1e-{exchange_info['baseAssetPrecision']}') # 这是RATO的精度
  139. self.base_coin_asset_precision = Decimal(f'1e-{exchange_info['quoteAssetPrecision']}') # 这是USDT的精度
  140. self.price_precision = Decimal(f'1e-{exchange_info['quotePrecision']}') # 这是价格的精度
  141. def _set_state(self, state):
  142. """
  143. 设置系统状态,并打印日志
  144. """
  145. if state in self.STATES:
  146. logger.info(f"状态变更:{self.current_state} -> {state}")
  147. logger.info('')
  148. self.current_state = state
  149. else:
  150. logger.error(f"尝试设置无效状态:{state}")
  151. def run_arbitrage_step(self):
  152. """
  153. 根据当前状态执行套利流程的下一步
  154. 这是一个周期性调用的函数,例如在主循环中调用
  155. """
  156. if self.current_state == self.STATE_CHECK:
  157. self._execute_check()
  158. elif self.current_state == self.STATE_BUYING_ON_EXCHANGE:
  159. self._execute_buy_on_exchange()
  160. elif self.current_state == self.STATE_SELLING_ON_CHAIN:
  161. self._selling_on_chain()
  162. elif self.current_state == self.STATE_WAITING_CHAIN_CONFIRM:
  163. self._wait_chain_confirm()
  164. elif self.current_state == self.STATE_WAITING_EXCHANGE_ROLLBACK:
  165. self._wait_exchange_rollback()
  166. elif self.current_state == self.STATE_WAITING_TRANSFER_ARRIVE:
  167. self._wait_transfer_arrive()
  168. elif self.current_state == self.STATE_TRANSFERRING_TO_CHAIN:
  169. self._execute_transfer_to_chain()
  170. elif self.current_state == self.STATE_WAITING_WITHDRAWAL_CONFIRM:
  171. self._wait_withdrawal_confirm()
  172. elif self.current_state == self.STATE_COMPLETED:
  173. msg = "套利流程成功完成!"
  174. logger.info(msg)
  175. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  176. elif self.current_state == self.STATE_REJECT:
  177. msg = "套利流程被程序拒绝"
  178. logger.error(msg)
  179. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  180. elif self.current_state == self.STATE_FAILED:
  181. msg = "套利流程失败!"
  182. logger.error(msg)
  183. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  184. def _execute_check(self):
  185. """
  186. 前置检查,防止低能错误
  187. """
  188. try:
  189. # step1,檢查交易所的餘額是否夠用,给2%的余量
  190. pseudo_value_to_buy = (self.exchange_buy_amount * self.mexc_price) * Decimal(1.02)
  191. pseudo_value_to_buy = pseudo_value_to_buy.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  192. # 交易所套保余额判断
  193. with self.mexc_lock:
  194. balances = self.mexc_data['account_info']['balances']
  195. for balance in balances:
  196. if balance['asset'] == self.base_coin:
  197. if Decimal(balance['free']) < pseudo_value_to_buy:
  198. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 买入要使用:{pseudo_value_to_buy}, 不能触发交易。"
  199. logger.info(msg)
  200. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  201. self._set_state(self.STATE_REJECT)
  202. return
  203. else:
  204. msg = f"交易所剩余{self.base_coin}: {balance['free']}, 买入要使用:{pseudo_value_to_buy}, 余额校验通过。"
  205. logger.info(msg)
  206. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  207. break
  208. # 检查该币对是否有提现权限
  209. if not self.WITHDRAW_ENABLE:
  210. coin_info_formated = pformat(self.withdraw_info, indent=2)
  211. msg = f"{self.coin}不允许提现。\n{coin_info_formated}"
  212. logger.info(msg)
  213. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  214. self._set_state(self.STATE_REJECT)
  215. return
  216. # step2,估算gas
  217. logger.info("獲取區塊信息")
  218. with self.core_lock:
  219. latest_block = copy.deepcopy(self.core_data['block'])
  220. self.tx['maxPriorityFeePerGas'] = int(int(self.tx['maxPriorityFeePerGas']) * self.gas_price_multiplier)
  221. self.tx['maxFeePerGas'] = int(int(latest_block['baseFeePerGas']) * 2 + self.tx['maxPriorityFeePerGas'])
  222. gas_price = Decimal(self.tx['maxPriorityFeePerGas'] + self.tx['maxFeePerGas'])
  223. gas_price_gwei = gas_price / Decimal('1e9')
  224. gas_price_gwei = gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  225. tx_formated = pformat(self.tx, indent=2)
  226. logger.info(f"鏈上各種校驗\n{tx_formated}")
  227. estimated_gas_origin = web3.w3.eth.estimate_gas(self.tx)
  228. estimated_gas = int(estimated_gas_origin * self.gas_limit_multiplier)
  229. estimated_wei = Decimal(estimated_gas) * gas_price
  230. estimated_eth = Decimal(estimated_wei / Decimal('1e18')) / Decimal(2) # 除以2才是比較接近正常消耗的gas費,否則會過於高估
  231. estimated_eth = estimated_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  232. msg = f"估算的燃气量: {estimated_gas}, eth消耗: {estimated_eth}, gas price: {gas_price_gwei} gwei, gas估算通過"
  233. logger.info(msg)
  234. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  235. # step3, 費用與利潤比較
  236. estimated_eth_value = estimated_eth * self.eth_price
  237. estimated_eth_value = estimated_eth_value.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  238. cost = estimated_eth_value + self.WITHDRAW_FEE * self.mexc_price # 成本
  239. if self.profit < cost:
  240. msg = f"費用判斷不通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  241. logger.info(msg)
  242. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  243. self._set_state(self.STATE_REJECT)
  244. return
  245. msg = f"費用判斷通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  246. logger.info(msg)
  247. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  248. # step4, 與賬戶eth餘額比對(至少留0.002,不然沒gas了)
  249. MARGIN = Decimal(0.002)
  250. # 暫時鎖住core_data
  251. with self.core_lock:
  252. eth_balance = self.core_data['eth_balance']
  253. if eth_balance - estimated_eth < MARGIN:
  254. msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  255. logger.info(msg)
  256. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  257. self._set_state(self.STATE_REJECT)
  258. return
  259. # 餘額判斷通過后預扣除balance,防止綫程更新不及時導致其他綫程誤發送tx
  260. self.core_data['eth_balance'] = self.core_data['eth_balance'] - estimated_eth
  261. msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  262. logger.info(msg)
  263. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  264. # final, 設定交易狀態,開始交易
  265. self._set_state(self.STATE_BUYING_ON_EXCHANGE)
  266. except Exception as e:
  267. exc_traceback = traceback.format_exc()
  268. msg = f"前置檢查未通過\n{exc_traceback}"
  269. logger.error(msg)
  270. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  271. self._set_state(self.STATE_REJECT)
  272. # traceback.print_exc()
  273. # 以下是每个状态对应的具体执行函数
  274. def _execute_buy_on_exchange(self):
  275. """
  276. 在中心化交易所买入现货
  277. """
  278. msg = "执行:中心化交易所买入现货..."
  279. logger.info(msg)
  280. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  281. try:
  282. order_times = 0
  283. self.already_bought_amount = Decimal(0)
  284. while self.already_bought_amount < self.exchange_buy_amount and order_times < 5:
  285. order_times = order_times + 1
  286. # 第一步直接买入,这个数量用固定数量
  287. pseudo_amount_to_buy = self.exchange_buy_amount - self.already_bought_amount
  288. # 处理精度
  289. pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  290. # 初始化 quantity 变量
  291. quantity_for_api = None
  292. # 用求余法判断是否是整数
  293. if pseudo_amount_to_buy % 1 == 0:
  294. # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量
  295. quantity_for_api = int(pseudo_amount_to_buy)
  296. else:
  297. # 如果是非整数,转换为 float 类型。这是最常见的API数量类型
  298. quantity_for_api = float(pseudo_amount_to_buy)
  299. order_params = {
  300. "symbol": self.symbol.replace('_', ''),
  301. "side": "BUY",
  302. "type": "MARKET",
  303. "quantity": quantity_for_api, # 以【幣】的數量進行買入
  304. }
  305. order_params_formated = pformat(order_params, indent=2)
  306. exchange_buy_order = mexc.trade.post_order(order_params)
  307. exchange_buy_order_formated = pformat(exchange_buy_order, indent=2)
  308. msg = f"[{order_times}]交易所现货买入订单已发送 \n params:{order_params_formated} \n rst: {exchange_buy_order_formated}"
  309. logger.info(msg)
  310. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  311. if 'orderId' not in exchange_buy_order:
  312. continue
  313. # 查询交易所订单状态
  314. exchange_buy_order_id = exchange_buy_order['orderId']
  315. waiting_times = 5
  316. while waiting_times > 0:
  317. params = {
  318. "symbol": self.symbol.replace('_', ''),
  319. "orderId": exchange_buy_order_id
  320. }
  321. order = mexc.trade.get_order(params)
  322. order_formated = pformat(order, indent=2)
  323. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  324. # 以实际成交数量为准
  325. money = Decimal(order['cummulativeQuoteQty'])
  326. self.already_bought_amount = self.already_bought_amount + Decimal(order['executedQty'])
  327. self.buy_value = self.buy_value + money
  328. self.buy_price = self.buy_value / self.already_bought_amount
  329. self.buy_price = self.buy_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  330. msg = f"交易所现货买入订单已完成, 价格:{self.buy_price}, money: {money}\n order: {order_formated}"
  331. logger.info(msg)
  332. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  333. self.exchange_withdrawal_amount = self.already_bought_amount
  334. break
  335. else:
  336. msg = f"交易所现货买入订单失败\n order: {order_formated}"
  337. logger.error(msg)
  338. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  339. time.sleep(1)
  340. waiting_times = waiting_times - 1
  341. if order_times < 5:
  342. msg = 'mexc现货买入流程完成'
  343. logger.info(msg)
  344. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  345. self._set_state(self.STATE_SELLING_ON_CHAIN)
  346. else:
  347. msg = 'mexc现货买入流程失败, 重试次数大于5'
  348. logger.info(msg)
  349. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  350. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  351. except Exception as e:
  352. exc_traceback = traceback.format_exc()
  353. msg = f"交易所现货买入下单失败\n{exc_traceback}"
  354. logger.error(msg)
  355. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  356. self._set_state(self.STATE_FAILED)
  357. # traceback.print_exc()
  358. def _selling_on_chain(self):
  359. """
  360. 在链上执行卖出操作
  361. """
  362. msg = "执行:链上卖出操作..."
  363. logger.info(msg)
  364. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  365. try:
  366. # 交易前nonce
  367. with self.core_lock:
  368. self.tx['nonce'] = self.core_data['nonce']
  369. # 调用链上客户端执行卖出交易
  370. signed_tx = web3._sign(self.tx, self.gas_limit_multiplier)
  371. self.chain_tx_hash = web3.w3.to_hex(signed_tx.hash)
  372. try:
  373. # 主要节点先发交易
  374. web3.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
  375. # 使用备用节点再发一次交易
  376. web3_backup.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
  377. except Exception as e:
  378. msg = f"据反饋說链上卖出失败:{e}, 交易哈希:{self.chain_tx_hash}"
  379. logger.error(msg)
  380. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  381. # 交易成功后刷新全局nonce
  382. with self.core_lock:
  383. self.core_data['nonce'] = self.core_data['nonce'] + 1
  384. block_number = self.core_data['block_number']
  385. # 將hash放入pending裏,等待確認
  386. with self.pending_lock:
  387. self.pending_data[self.chain_tx_hash] = {
  388. "block_number": block_number,
  389. "tx_details": None,
  390. "reponse": None,
  391. }
  392. # 交易成功
  393. msg = f"再次確認交易是否上鏈:{self.chain_tx_hash}"
  394. logger.info(msg)
  395. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  396. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  397. except Exception as e:
  398. exc_traceback = traceback.format_exc()
  399. msg = f"鏈上買入未處理的錯誤, 交易哈希:{self.chain_tx_hash}\n{exc_traceback}"
  400. logger.error(msg)
  401. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  402. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  403. # traceback.print_exc()
  404. def _wait_chain_confirm(self):
  405. """
  406. 等待链上交易确认
  407. """
  408. chain_tx_hash = self.chain_tx_hash
  409. msg = f"等待链上交易确认:{chain_tx_hash}"
  410. logger.info(msg)
  411. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  412. try:
  413. # 給120秒時間進行確認
  414. waiting_times = 120
  415. while waiting_times > 0:
  416. with self.pending_lock:
  417. tx_details = copy.deepcopy(self.pending_data[chain_tx_hash]['tx_details'])
  418. if tx_details is None:
  419. waiting_times = waiting_times - 1
  420. time.sleep(1)
  421. continue
  422. # # 交易確認后,移除出pending列表
  423. # with self.pending_lock:
  424. # del self.pending_data[chain_tx_hash]
  425. # 交易失敗的邏輯處理,直接進行回滾
  426. if 'fromTokenDetails' not in tx_details \
  427. or 'toTokenDetails' not in tx_details:
  428. msg = f"链上交易失敗。{tx_details}"
  429. logger.info(msg)
  430. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  431. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  432. break
  433. tx_details_formated = pformat(tx_details, indent=2)
  434. msg = f"链上交易已确认。\n details: {tx_details_formated}"
  435. logger.info(msg)
  436. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  437. # 獲取交易信息
  438. from_token_details = tx_details['fromTokenDetails']
  439. to_token_details = tx_details['toTokenDetails']
  440. from_token_amount = Decimal(from_token_details['amount'])
  441. from_token_amount_human = from_token_amount / (Decimal(10) ** self.from_token_decimal)
  442. from_token_amount_human = from_token_amount_human.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  443. to_token_amount = Decimal(to_token_details['amount'])
  444. to_token_amount_human = to_token_amount / (Decimal(10) ** self.to_token_decimal)
  445. to_token_amount_human = to_token_amount_human.quantize(self.base_coin_asset_precision, rounding=ROUND_DOWN)
  446. self.sell_value = to_token_amount_human
  447. self.sell_price = to_token_amount_human / from_token_amount_human
  448. self.sell_price = self.sell_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  449. # 交易預估利潤百分比計算
  450. rate = self.sell_price / self.buy_price
  451. rate = rate.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  452. msg = f"【比率{rate}】。用{from_token_amount_human}卖出{to_token_amount_human},价格{self.sell_price}。"
  453. logger.info(msg)
  454. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  455. # 計算實際利潤
  456. value = self.sell_value - self.buy_value
  457. actual_gas_price = Decimal(tx_details['gasPrice'])
  458. actual_gas_price_gwei = actual_gas_price / Decimal('1e9')
  459. actual_gas_price_gwei = actual_gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  460. actual_gas_used = Decimal(tx_details['gasUsed'])
  461. actual_wei = actual_gas_price * actual_gas_used
  462. actual_eth = actual_wei / Decimal('1e18')
  463. actual_eth = actual_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  464. actual_fee_used = actual_eth * self.eth_price
  465. actual_fee_used = actual_fee_used.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  466. actual_profit = value - actual_fee_used - self.WITHDRAW_FEE * self.mexc_price
  467. msg = f"【最終利潤】{actual_profit}{self.base_coin}(已扣除所有手續費、滑點)\
  468. \n鏈上ETH使用: {actual_eth}({actual_fee_used} USD), gas_price: {actual_gas_price_gwei} GWEI, gas_used: {actual_gas_used}\
  469. \n交易所出售代幣利潤: {value}, 提現手續費: {self.WITHDRAW_FEE}\
  470. "
  471. logger.info(msg)
  472. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  473. self._set_state(self.STATE_COMPLETED)
  474. break
  475. # 如果120秒都沒確認成功,該交易大概率沒有上鏈
  476. if waiting_times <= 0:
  477. with self.pending_lock:
  478. response = copy.deepcopy(self.pending_data[chain_tx_hash])
  479. response_formated = pformat(response, indent=2)
  480. msg = f"链上交易确认失败:{chain_tx_hash}\n{response_formated}"
  481. logger.error(msg)
  482. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  483. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  484. except Exception as e:
  485. exc_traceback = traceback.format_exc()
  486. msg = f"查询链上确认状态时发生错误\n{exc_traceback}"
  487. logger.error(msg)
  488. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  489. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  490. # traceback.print_exc()
  491. # def _wait_exchange_rollback(self):
  492. # """
  493. # 市价进行交易所交易回滚
  494. # """
  495. # msg = "执行:中心化交易所卖出现货回滚..."
  496. # logger.info(msg)
  497. # add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  498. # try:
  499. # # 使用预提现数量进行回滚
  500. # pseudo_amount_to_sell = Decimal(self.already_bought_amount)
  501. # # 处理精度
  502. # pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  503. # # 交易所币余额判断
  504. # with self.mexc_lock:
  505. # balances = self.mexc_data['account_info']['balances']
  506. # for balance in balances:
  507. # if balance['asset'] == self.coin:
  508. # pseudo_amount_to_sell = min(Decimal(balance['free']), pseudo_amount_to_sell)
  509. # if pseudo_amount_to_sell * self.mexc_price < Decimal('10'):
  510. # msg = f"交易所剩余{self.coin}: {balance['free']}, 小于10, 不能触发回滚交易。"
  511. # logger.info(msg)
  512. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  513. # self._set_state(self.STATE_FAILED)
  514. # return
  515. # else:
  516. # msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备使用:{pseudo_amount_to_sell}, 余额校验通过(可以回滚)。"
  517. # logger.info(msg)
  518. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  519. # break
  520. # ready_to_sell = pseudo_amount_to_sell
  521. # # 初始化 quantity 变量
  522. # quantity_for_api = None
  523. # # 用求余法判断是否是整数
  524. # if ready_to_sell % 1 == 0:
  525. # # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量
  526. # quantity_for_api = int(ready_to_sell)
  527. # else:
  528. # # 如果是非整数,转换为 float 类型。这是最常见的API数量类型
  529. # quantity_for_api = float(ready_to_sell)
  530. # order_params = {
  531. # "symbol": self.symbol.replace('_', ''),
  532. # "side": "SELL",
  533. # "price": self.buy_price,
  534. # "type": "LIMIT",
  535. # "quantity": quantity_for_api,
  536. # }
  537. # order_params_formated = pformat(order_params, indent=2)
  538. # exchange_sell_order = mexc.trade.post_order(order_params)
  539. # exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  540. # if 'orderId' not in exchange_sell_order:
  541. # msg = f"【回滚】交易所现货卖出下单失败\n params:{order_params_formated}\norder: {exchange_sell_order_formated}"
  542. # logger.error(msg)
  543. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  544. # self._set_state("FAILED")
  545. # return
  546. # exchange_sell_order_id = exchange_sell_order['orderId']
  547. # msg = f"【回滚】交易所现货卖出订单已发送, 订单ID: {exchange_sell_order_id}"
  548. # logger.info(msg)
  549. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  550. # # 查询交易所订单状态
  551. # last_query_rst = None
  552. # while True:
  553. # params = {
  554. # "symbol": self.symbol.replace('_', ''),
  555. # "orderId": exchange_sell_order_id
  556. # }
  557. # order = mexc.trade.get_order(params)
  558. # order_formated = pformat(order, indent=2)
  559. # last_query_rst = order
  560. # if order['status'] == "FILLED":
  561. # money = Decimal(order['cummulativeQuoteQty'])
  562. # amount = self.exchange_buy_amount
  563. # price = money / amount
  564. # price = price.quantize(self.price_precision, rounding=ROUND_DOWN)
  565. # msg = f"【回滚】交易所现货卖出订单已完全成交, 价格:{price}。\norder: {order_formated}"
  566. # logger.info(msg)
  567. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  568. # self._set_state(self.STATE_FAILED)
  569. # return
  570. # else:
  571. # # 继续等待成交
  572. # pass
  573. # time.sleep(1)
  574. # last_query_rst_formated = pformat(last_query_rst, indent=2)
  575. # msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_sell_order_id}\n最终状态:{last_query_rst_formated}"
  576. # logger.info(msg)
  577. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  578. # self._set_state(self.STATE_FAILED)
  579. # except Exception as e:
  580. # exc_traceback = traceback.format_exc()
  581. # msg = f"【回滚】交易所回滚交易失败\n{exc_traceback}"
  582. # logger.error(msg)
  583. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  584. # self._set_state(self.STATE_FAILED)
  585. # # traceback.print_exc()
  586. # def _wait_transfer_arrive(self):
  587. # """
  588. # 等待资产在交易所内到账
  589. # """
  590. # msg = f"等待资产在交易所到账..."
  591. # logger.info(msg)
  592. # add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  593. # try:
  594. # is_arrived = False
  595. # # 先進行快速提現判斷,如果不滿足條件就走後面的等待充值模式,雙模,這個步驟最多等待10分鐘
  596. # waiting_times = 10
  597. # last_deposit_state = None
  598. # while waiting_times > 0:
  599. # time.sleep(60)
  600. # with self.mexc_lock:
  601. # deposit_list = copy.deepcopy(self.mexc_data['deposit_list'])
  602. # # 是否已經在列表中了,抹茶識別充值會稍微有點慢,所以要耐心等
  603. # is_list = False
  604. # # pending中的數量
  605. # pending_amount = Decimal(0)
  606. # for deposit in deposit_list:
  607. # # 不屬于該路徑需要監聽的代幣
  608. # if deposit['coin'] != f'{self.base_coin}-{self.NETWORK}':
  609. # continue
  610. # # 處理pending數量
  611. # if Decimal(deposit['confirmTimes']) < Decimal(deposit['unlockConfirm']):
  612. # pending_amount = pending_amount + Decimal(deposit['amount'])
  613. # # 檢查到沒到列表中
  614. # if deposit['transHash'] != self.chain_tx_hash:
  615. # continue
  616. # last_deposit_state = deposit
  617. # is_list = True
  618. # # 檢查是否滿足快速提現的條件
  619. # if is_list:
  620. # # 交易所代幣余额判断
  621. # with self.mexc_lock:
  622. # balances = copy.deepcopy(self.mexc_data['account_info']['balances'])
  623. # asset_balance = 0
  624. # for balance in balances:
  625. # if balance['asset'] == self.base_coin:
  626. # asset_balance = Decimal(balance['free'])
  627. # # 交易所賣出餘額
  628. # buy_value = self.buy_value
  629. # # 最終判斷
  630. # if buy_value + asset_balance > pending_amount:
  631. # last_deposit_state_formated = pformat(last_deposit_state, indent=2)
  632. # msg = f"【flash】资产可以進行快速提現。\nbuy_value {buy_value}, asset_balance {asset_balance}, pending_amount {pending_amount}\n{last_deposit_state_formated}"
  633. # logger.info(msg)
  634. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  635. # self._set_state(self.STATE_TRANSFERRING_TO_CHAIN)
  636. # return
  637. # else:
  638. # logger.info(f"buy_value{buy_value}, asset_balance{asset_balance}, pending_amount{pending_amount}")
  639. # logger.info(f"正在檢查快速提現條件...({waiting_times}/10)")
  640. # waiting_times = waiting_times - 1
  641. # # 最多等待30分钟
  642. # waiting_times = 30
  643. # last_deposit_state = None
  644. # last_deposit_state_formated = None
  645. # while waiting_times > 0:
  646. # with self.mexc_lock:
  647. # deposit_list = copy.deepcopy(self.mexc_data['deposit_list'])
  648. # for deposit in deposit_list:
  649. # if deposit['transHash'] != self.chain_tx_hash:
  650. # continue
  651. # last_deposit_state = deposit
  652. # last_deposit_state_formated = pformat(last_deposit_state, indent=2)
  653. # logger.info(f"等待资产在交易所到账...({deposit['confirmTimes']}/{deposit['unlockConfirm']})")
  654. # if Decimal(deposit['confirmTimes']) >= Decimal(deposit['unlockConfirm']):
  655. # is_arrived = True
  656. # if is_arrived:
  657. # msg = f"资产已在交易所到账。\n{last_deposit_state_formated}"
  658. # logger.info(msg)
  659. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  660. # self._set_state(self.STATE_TRANSFERRING_TO_CHAIN)
  661. # return
  662. # time.sleep(60)
  663. # waiting_times = waiting_times - 1
  664. # msg = f"等待充值到账超时(超过30分钟)。\n{last_deposit_state_formated}"
  665. # logger.error(msg)
  666. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  667. # self._set_state(self.STATE_FAILED)
  668. # except Exception as e:
  669. # msg = f"查询交易所到账状态时发生错误:{e}"
  670. # logger.error(msg)
  671. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  672. # self._set_state(self.STATE_FAILED)
  673. # traceback.print_exc()
  674. # def _execute_transfer_to_chain(self):
  675. # """
  676. # 将交易后获得的币(例如RATO)转账回链上
  677. # """
  678. # msg = "执行:交易所计价资产转账回链上..."
  679. # logger.info(msg)
  680. # add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  681. # try:
  682. # self.exchange_buy_amount = self.exchange_buy_amount.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  683. # pseudo_withdrawal_amount = str(self.exchange_buy_amount)
  684. # withdrawal_params = {
  685. # 'coin': self.coin,
  686. # 'netWork': 'ETH',
  687. # 'address': self.user_wallet,
  688. # 'amount': pseudo_withdrawal_amount
  689. # }
  690. # withdrawal_params_formated = pformat(withdrawal_params, indent=2)
  691. # withdrawal_rst = mexc.wallet.post_withdraw(withdrawal_params)
  692. # withdrawal_rst_formated = pformat(withdrawal_rst, indent=2)
  693. # if "id" not in withdrawal_rst:
  694. # msg = f"交易所提现失败\n參數: {withdrawal_params_formated}\n響應: {withdrawal_rst_formated}"
  695. # logger.error(msg)
  696. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  697. # self._set_state(self.STATE_FAILED)
  698. # else:
  699. # self.exchange_withdrawal_id = withdrawal_rst["id"]
  700. # msg = f"交易所提现已发送\n{withdrawal_rst_formated}"
  701. # logger.info(msg)
  702. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  703. # self._set_state(self.STATE_WAITING_WITHDRAWAL_CONFIRM)
  704. # except Exception as e:
  705. # msg = f"转账回链上失败: {e}"
  706. # logger.error(msg)
  707. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  708. # self._set_state(self.STATE_FAILED)
  709. # traceback.print_exc()
  710. # def _wait_withdrawal_confirm(self):
  711. # """
  712. # 等待交易所提现到链上确认
  713. # """
  714. # exchange_withdrawal_id = self.exchange_withdrawal_id
  715. # msg = f"等待交易所提现确认:{exchange_withdrawal_id}"
  716. # logger.info(msg)
  717. # add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  718. # try:
  719. # is_arrived = False
  720. # # 最多等待30分钟
  721. # waiting_times = 60
  722. # last_deposit_state = None
  723. # last_deposit_state_formated = None
  724. # while waiting_times > 0:
  725. # with self.mexc_lock:
  726. # withdraw_list = copy.deepcopy(self.mexc_data['withdraw_list'])
  727. # if not isinstance(withdraw_list, list):
  728. # msg = f"查询交易所提现状态时发生错误:{withdraw_list}"
  729. # logger.error(msg)
  730. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  731. # self._set_state("FAILED")
  732. # return
  733. # for withdrawal in withdraw_list:
  734. # if withdrawal['id'] != exchange_withdrawal_id:
  735. # continue
  736. # last_deposit_state = withdrawal
  737. # last_deposit_state_formated = pformat(last_deposit_state, indent=2)
  738. # if withdrawal['status'] == 7:
  739. # is_arrived = True
  740. # if is_arrived:
  741. # msg = f"提现请求已上链:\n{last_deposit_state_formated}"
  742. # logger.info(msg)
  743. # add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  744. # self._set_state(self.STATE_COMPLETED)
  745. # return
  746. # time.sleep(30)
  747. # waiting_times = waiting_times - 1
  748. # msg = f"等待提现到账超时(超过30分钟):\n{last_deposit_state_formated}"
  749. # logger.error(msg)
  750. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  751. # self._set_state(self.STATE_FAILED)
  752. # except Exception as e:
  753. # msg = f"查询交易所提现状态时发生错误:{e}"
  754. # logger.error(msg)
  755. # add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  756. # self._set_state(self.STATE_FAILED)
  757. # traceback.print_exc()