s_erc20_to_mexc.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749
  1. import time
  2. import traceback
  3. import copy
  4. import os
  5. from web3_py_client import EthClient
  6. from mexc_client import MexcClient
  7. from decimal import Decimal, ROUND_DOWN
  8. from as_utils import add_state_flow_entry
  9. from checker.logger_config import get_logger
  10. from pprint import pformat
  11. web3 = EthClient()
  12. web3_backup = EthClient(os.getenv("RPC_URL_2"))
  13. mexc = MexcClient()
  14. # 配置日志
  15. logger = get_logger('as')
  16. class ArbitrageProcess:
  17. def __init__(self, gas_limit_multiplier, gas_price_multiplier, process_item,
  18. core_data, core_lock,
  19. pending_data, pending_lock,
  20. mexc_data, mexc_lock,
  21. ):
  22. """
  23. 初始化套利流程
  24. Args:
  25. gas_limit_multiplier: gas limit倍数, 一般都不加倍
  26. gas_price_multiplier: gas price倍数, 可以提高交易成功率
  27. process_item: 信號發送端傳入的原始參數
  28. """
  29. self.NETWORK = 'ETH'
  30. tx = process_item['tx']
  31. tx.pop('gasPrice', None)
  32. tx.pop('value', None)
  33. tx.pop('minReceiveAmount', None)
  34. tx.pop('slippage', None)
  35. tx.pop('maxSpendAmount', None)
  36. tx.pop('signatureData', None)
  37. self.core_data = core_data
  38. self.core_lock = core_lock
  39. self.pending_data = pending_data
  40. self.pending_lock = pending_lock
  41. self.mexc_data = mexc_data
  42. self.mexc_lock = mexc_lock
  43. # symbol轉大寫
  44. self.symbol = process_item['symbol'].upper()
  45. self.coin = self.symbol.split('_')[0]
  46. self.base_coin = self.symbol.split('_')[1]
  47. # 获取eth价格
  48. with self.core_lock:
  49. self.eth_price = self.core_data['eth_price']
  50. # 获取提现信息
  51. with self.mexc_lock:
  52. self.withdraw_info = copy.deepcopy(self.mexc_data['coin_info_map'][self.base_coin][self.NETWORK])
  53. self.WITHDRAW_FEE = Decimal(self.withdraw_info['withdrawFee']) # 提現手續費
  54. withdraw_info_formated = pformat(self.withdraw_info, indent=2)
  55. logger.info(f'提現信息識別, 手續費:{self.WITHDRAW_FEE}\n{withdraw_info_formated}')
  56. self.tx = tx
  57. self.profit = Decimal(process_item['profit']) # 這個利潤是實際到手利潤
  58. self.profit_limit = Decimal(process_item['profitLimit']) # 這個利潤是實際到手利潤的limit
  59. self.gas_limit_multiplier = gas_limit_multiplier
  60. self.gas_price_multiplier = gas_price_multiplier
  61. self.from_token_addr = process_item['fromToken']
  62. self.from_token_decimal = Decimal(process_item['fromTokenDecimal'])
  63. self.to_token_addr = process_item['toToken']
  64. self.to_token_decimal = Decimal(process_item['toTokenDecimal'])
  65. self.user_exchange_wallet = process_item['userExchangeWallet']
  66. self.user_wallet = process_item['userWallet']
  67. self.process_item = process_item
  68. # 存储当前套利交易的细节信息,例如买入数量、价格等
  69. self.sell_price = Decimal(0)
  70. self.sell_value = Decimal(0) # 实际卖出价值
  71. self.buy_price = Decimal(0)
  72. self.chain_tx_hash = None # 链上买入的tx hash
  73. self.exchange_sell_amount = Decimal(process_item['exchangeOutAmount']) # 交易所卖出量
  74. self.actual_profit = Decimal(0) # 實際利潤
  75. # 定义可能的状态
  76. self.STATES = [
  77. "CHECK", # 检查余额、估算gas等
  78. "SELLING_ON_EXCHANGE", # 正在中心化交易所卖出现货
  79. "WAITING_SELL_CONFIRM", # 等待现货卖出订单确认
  80. "BUYING_ON_CHAIN", # 正在链上买入
  81. "WAITING_CHAIN_CONFIRM", # 等待链上交易确认
  82. "WAITING_EXCHANGE_ROLLBACK", # 等待交易所回滚
  83. "COMPLETED", # 套利流程完成
  84. "REJECT", # 套利被程序拒绝
  85. "FAILED" # 套利流程失败
  86. ]
  87. self.STATE_IDLE = "IDLE"
  88. self.STATE_CHECK = "CHECK"
  89. self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE"
  90. self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM"
  91. self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN"
  92. self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
  93. self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK"
  94. self.STATE_COMPLETED = "COMPLETED"
  95. self.STATE_REJECT = "REJECT"
  96. self.STATE_FAILED = "FAILED"
  97. # 所有前置信息获取都没有问题的话就等待开机信号
  98. self.current_state = self.STATE_IDLE
  99. # --------------------------------------- 获取交易规则 ---------------------------------------
  100. exchange_info_params = {
  101. "symbols": self.symbol.replace('_', '')
  102. }
  103. exchange_info_rst = mexc.market.get_exchangeInfo(exchange_info_params)
  104. # 返回值检查
  105. if 'symbols' not in exchange_info_rst or len(exchange_info_rst['symbols']) != 1:
  106. params_formated = pformat(exchange_info_params, indent=2)
  107. info_formated = pformat(exchange_info_rst, indent=2)
  108. msg = f'获取交易规则时出现错误\n{exchange_info_params}\n{info_formated}'
  109. logger.error(msg)
  110. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  111. self.current_state = self.STATE_FAILED
  112. return
  113. # 返回的交易对信息核对]
  114. exchange_info = exchange_info_rst['symbols'][0]
  115. if exchange_info['symbol'].upper() != self.symbol.replace('_', ''):
  116. info_formated = pformat(exchange_info, indent=2)
  117. msg = f'获取到的交易规则与交易币对无关\n{info_formated}'
  118. logger.error(msg)
  119. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  120. self.current_state = self.STATE_FAILED
  121. return
  122. # 精度取得, 假如是RATOUSDT这个交易对的话:
  123. self.coin_asset_precision = Decimal(f'1e-{exchange_info['baseAssetPrecision']}') # 这是RATO的精度
  124. self.base_coin_asset_precision = Decimal(f'1e-{exchange_info['quoteAssetPrecision']}') # 这是USDT的精度
  125. self.price_precision = Decimal(f'1e-{exchange_info['quotePrecision']}') # 这是价格的精度
  126. def _set_state(self, state):
  127. """
  128. 设置系统状态,并打印日志
  129. """
  130. if state in self.STATES:
  131. logger.info(f"状态变更:{self.current_state} -> {state}")
  132. logger.info('')
  133. self.current_state = state
  134. else:
  135. logger.error(f"尝试设置无效状态:{state}")
  136. def run_arbitrage_step(self):
  137. """
  138. 根据当前状态执行套利流程的下一步
  139. 这是一个周期性调用的函数,例如在主循环中调用
  140. """
  141. if self.current_state == self.STATE_CHECK:
  142. self._execute_check()
  143. elif self.current_state == self.STATE_SELLING_ON_EXCHANGE:
  144. self._execute_sell_on_exchange()
  145. elif self.current_state == self.STATE_WAITING_SELL_CONFIRM:
  146. self._wait_sell_confirm()
  147. elif self.current_state == self.STATE_BUYING_ON_CHAIN:
  148. self._execute_buy_on_chain()
  149. elif self.current_state == self.STATE_WAITING_CHAIN_CONFIRM:
  150. self._wait_chain_confirm()
  151. elif self.current_state == self.STATE_WAITING_EXCHANGE_ROLLBACK:
  152. self._wait_exchange_rollback()
  153. elif self.current_state == self.STATE_COMPLETED:
  154. msg = "套利流程成功完成!"
  155. logger.info(msg)
  156. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  157. elif self.current_state == self.STATE_REJECT:
  158. msg = "套利流程被程序拒绝"
  159. logger.error(msg)
  160. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  161. elif self.current_state == self.STATE_FAILED:
  162. msg = "套利流程失败!"
  163. logger.error(msg)
  164. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  165. def _execute_check(self):
  166. """
  167. 前置检查,防止低能错误
  168. """
  169. try:
  170. # step1,檢查交易所的餘額是否夠用
  171. # 处理精度
  172. pseudo_amount_to_sell = self.exchange_sell_amount.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  173. # 交易所套保余额判断
  174. with self.mexc_lock:
  175. balances = self.mexc_data['account_info']['balances']
  176. for balance in balances:
  177. if balance['asset'] == self.coin:
  178. if Decimal(balance['free']) < pseudo_amount_to_sell:
  179. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 不能触发套保交易。"
  180. logger.info(msg)
  181. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  182. self._set_state(self.STATE_REJECT)
  183. return
  184. else:
  185. msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 余额校验通过(可以套保)。"
  186. logger.info(msg)
  187. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  188. break
  189. # step2,估算gas
  190. logger.info("獲取區塊信息")
  191. with self.core_lock:
  192. latest_block = copy.deepcopy(self.core_data['block'])
  193. self.tx['maxPriorityFeePerGas'] = int(int(self.tx['maxPriorityFeePerGas']) * self.gas_price_multiplier)
  194. self.tx['maxFeePerGas'] = int(int(latest_block['baseFeePerGas']) * 2 + self.tx['maxPriorityFeePerGas'])
  195. gas_price = Decimal(self.tx['maxPriorityFeePerGas'] + self.tx['maxFeePerGas'])
  196. gas_price_gwei = gas_price / Decimal('1e9')
  197. gas_price_gwei = gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  198. tx_formated = pformat(self.tx, indent=2)
  199. logger.info(f"鏈上各種校驗\n{tx_formated}")
  200. estimated_gas_origin = web3.w3.eth.estimate_gas(self.tx)
  201. estimated_gas = int(estimated_gas_origin * self.gas_limit_multiplier)
  202. estimated_wei = Decimal(estimated_gas) * gas_price
  203. estimated_eth = Decimal(estimated_wei / Decimal('1e18')) / Decimal(2) # 除以2才是比較接近正常消耗的gas費,否則會過於高估
  204. estimated_eth = estimated_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  205. msg = f"估算的燃气量: {estimated_gas}, eth消耗: {estimated_eth}, gas price: {gas_price_gwei} gwei, gas估算通過"
  206. logger.info(msg)
  207. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  208. # step3, 費用與利潤比較
  209. estimated_eth_value = estimated_eth * self.eth_price
  210. estimated_eth_value = estimated_eth_value.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  211. cost = estimated_eth_value + self.WITHDRAW_FEE # 成本
  212. if self.profit < cost:
  213. msg = f"費用判斷不通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  214. logger.info(msg)
  215. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  216. self._set_state(self.STATE_REJECT)
  217. return
  218. msg = f"費用判斷通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
  219. logger.info(msg)
  220. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  221. # step4, 與賬戶eth餘額比對(至少留0.002,不然沒gas了)
  222. MARGIN = Decimal(0.002)
  223. # 暫時鎖住core_data
  224. with self.core_lock:
  225. eth_balance = self.core_data['eth_balance']
  226. if eth_balance - estimated_eth < MARGIN:
  227. msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  228. logger.info(msg)
  229. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  230. self._set_state(self.STATE_REJECT)
  231. return
  232. # 餘額判斷通過后預扣除balance,防止綫程更新不及時導致其他綫程誤發送tx
  233. self.core_data['eth_balance'] = self.core_data['eth_balance'] - estimated_eth
  234. msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
  235. logger.info(msg)
  236. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  237. # final, 設定交易狀態,開始交易
  238. self._set_state(self.STATE_SELLING_ON_EXCHANGE)
  239. except Exception as e:
  240. exc_traceback = traceback.format_exc()
  241. msg = f"前置檢查未通過\n{exc_traceback}"
  242. logger.error(msg)
  243. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  244. self._set_state(self.STATE_REJECT)
  245. # traceback.print_exc()
  246. # 以下是每个状态对应的具体执行函数
  247. def _execute_sell_on_exchange(self):
  248. """
  249. 在中心化交易所卖出现货
  250. """
  251. msg = "执行:中心化交易所卖出现货..."
  252. logger.info(msg)
  253. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  254. try:
  255. order_times = 0
  256. self.already_sold_amount = Decimal(0)
  257. while self.already_sold_amount < self.exchange_sell_amount and order_times < 5:
  258. order_times = order_times + 1
  259. # 第一步直接卖出,这个数量用固定数量
  260. pseudo_amount_to_sell = self.exchange_sell_amount - self.already_sold_amount
  261. # 处理精度
  262. pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  263. # 初始化 quantity 变量
  264. quantity_for_api = None
  265. # 用求余法判断是否是整数
  266. if pseudo_amount_to_sell % 1 == 0:
  267. # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量
  268. quantity_for_api = int(pseudo_amount_to_sell)
  269. else:
  270. # 如果是非整数,转换为 float 类型。这是最常见的API数量类型
  271. quantity_for_api = float(pseudo_amount_to_sell)
  272. order_params = {
  273. "symbol": self.symbol.replace('_', ''),
  274. "side": "SELL",
  275. "type": "MARKET",
  276. "quantity": quantity_for_api,
  277. }
  278. order_params_formated = pformat(order_params, indent=2)
  279. exchange_sell_order = mexc.trade.post_order(order_params)
  280. exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  281. msg = f"交易所现货卖出订单已发送 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
  282. logger.info(msg)
  283. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  284. if 'orderId' not in exchange_sell_order:
  285. continue
  286. # 查询交易所订单状态
  287. exchange_sell_order_id = exchange_sell_order['orderId']
  288. waiting_times = 5
  289. while waiting_times > 0:
  290. params = {
  291. "symbol": self.symbol.replace('_', ''),
  292. "orderId": exchange_sell_order_id
  293. }
  294. order = mexc.trade.get_order(params)
  295. order_formated = pformat(order, indent=2)
  296. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  297. # 以实际成交数量为准
  298. money = Decimal(order['cummulativeQuoteQty'])
  299. self.already_sold_amount = self.already_sold_amount + Decimal(order['executedQty'])
  300. self.sell_value = self.sell_value + money
  301. self.sell_price = self.sell_value / self.already_sold_amount
  302. self.sell_price = self.sell_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  303. msg = f"交易所现货卖出订单已完成, 价格:{self.sell_price}, money: {money}\n order: {order_formated}"
  304. logger.info(msg)
  305. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  306. break
  307. else:
  308. msg = f"交易所现货卖出失败\n order: {order_formated}"
  309. logger.error(msg)
  310. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  311. time.sleep(1)
  312. waiting_times = waiting_times - 1
  313. if order_times < 5:
  314. msg = 'mexc现货卖出流程完成'
  315. logger.info(msg)
  316. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  317. self._set_state(self.STATE_BUYING_ON_CHAIN)
  318. else:
  319. msg = 'mexc现货卖出流程失败, 重试次数大于5'
  320. logger.info(msg)
  321. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  322. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  323. except Exception as e:
  324. exc_traceback = traceback.format_exc()
  325. msg = f"交易所现货卖出下单失败\n{exc_traceback}"
  326. logger.error(msg)
  327. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  328. self._set_state(self.STATE_FAILED)
  329. # traceback.print_exc()
  330. def _execute_buy_on_chain(self):
  331. """
  332. 在链上执行买入操作
  333. """
  334. msg = "执行:链上买入操作..."
  335. logger.info(msg)
  336. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  337. try:
  338. # 交易前nonce
  339. with self.core_lock:
  340. self.tx['nonce'] = self.core_data['nonce']
  341. # 调用链上客户端执行买入交易
  342. signed_tx = web3._sign(self.tx, self.gas_limit_multiplier)
  343. self.chain_tx_hash = web3.w3.to_hex(signed_tx.hash)
  344. try:
  345. # 主要节点先发交易
  346. web3.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
  347. # 使用备用节点再发一次交易
  348. web3_backup.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
  349. except Exception as e:
  350. msg = f"据反饋說链上买入失败:{e}, 交易哈希:{self.chain_tx_hash}"
  351. logger.error(msg)
  352. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  353. # 交易成功后刷新全局nonce
  354. with self.core_lock:
  355. self.core_data['nonce'] = self.core_data['nonce'] + 1
  356. block_number = self.core_data['block_number']
  357. # 將hash放入pending裏,等待確認
  358. with self.pending_lock:
  359. self.pending_data[self.chain_tx_hash] = {
  360. "block_number": block_number,
  361. "tx_details": None,
  362. "reponse": None,
  363. }
  364. # 交易成功
  365. msg = f"再次確認交易是否上鏈:{self.chain_tx_hash}"
  366. logger.info(msg)
  367. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  368. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  369. except Exception as e:
  370. exc_traceback = traceback.format_exc()
  371. msg = f"鏈上買入未處理的錯誤, 交易哈希:{self.chain_tx_hash}\n{exc_traceback}"
  372. logger.error(msg)
  373. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  374. self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
  375. # traceback.print_exc()
  376. def _wait_chain_confirm(self):
  377. """
  378. 等待链上交易确认
  379. """
  380. chain_tx_hash = self.chain_tx_hash
  381. msg = f"等待链上交易确认:{chain_tx_hash}"
  382. logger.info(msg)
  383. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  384. try:
  385. # 給120秒時間進行確認
  386. waiting_times = 120
  387. while waiting_times > 0:
  388. with self.pending_lock:
  389. tx_details = copy.deepcopy(self.pending_data[chain_tx_hash]['tx_details'])
  390. if tx_details is None:
  391. waiting_times = waiting_times - 1
  392. time.sleep(1)
  393. continue
  394. # # 交易確認后,移除出pending列表
  395. # with self.pending_lock:
  396. # del self.pending_data[chain_tx_hash]
  397. # 交易失敗的邏輯處理,直接進行回滾
  398. if 'fromTokenDetails' not in tx_details \
  399. or 'toTokenDetails' not in tx_details:
  400. msg = f"链上交易失敗。{tx_details}"
  401. logger.info(msg)
  402. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  403. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  404. break
  405. tx_details_formated = pformat(tx_details, indent=2)
  406. msg = f"链上交易已确认。\n details: {tx_details_formated}"
  407. logger.info(msg)
  408. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  409. # 獲取交易信息
  410. from_token_details = tx_details['fromTokenDetails']
  411. to_token_details = tx_details['toTokenDetails']
  412. from_token_amount = Decimal(from_token_details['amount'])
  413. from_token_amount_human = from_token_amount / (Decimal(10) ** self.from_token_decimal)
  414. from_token_amount_human = from_token_amount_human.quantize(self.base_coin_asset_precision, rounding=ROUND_DOWN)
  415. to_token_amount = Decimal(to_token_details['amount'])
  416. to_token_amount_human = to_token_amount / (Decimal(10) ** self.to_token_decimal)
  417. self.buy_price = from_token_amount_human / to_token_amount_human
  418. self.buy_price = self.buy_price.quantize(self.price_precision, rounding=ROUND_DOWN)
  419. # 交易預估利潤百分比計算
  420. rate = self.sell_price / self.buy_price
  421. rate = rate.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  422. msg = f"【比率{rate}】。用{from_token_amount_human}买入{to_token_amount_human},价格{self.buy_price}。"
  423. logger.info(msg)
  424. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  425. # 判斷快速二賣條件
  426. diff = to_token_amount_human - self.exchange_sell_amount
  427. diff = diff.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  428. value = diff * self.sell_price
  429. value = value.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  430. if value > 2:
  431. msg = f"滿足二賣條件,{diff}*{self.sell_price} = {value}"
  432. logger.info(msg)
  433. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  434. # 初始化 quantity 变量
  435. quantity_for_api = None
  436. # 用求余法判断是否是整数
  437. if diff % 1 == 0:
  438. # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量
  439. quantity_for_api = int(diff)
  440. else:
  441. # 如果是非整数,转换为 float 类型。这是最常见的API数量类型
  442. quantity_for_api = float(diff)
  443. order_params = {
  444. "symbol": self.symbol.replace('_', ''),
  445. "side": "SELL",
  446. "type": "MARKET",
  447. "quantity": quantity_for_api,
  448. }
  449. order_params_formated = pformat(order_params, indent=2)
  450. exchange_sell_order = mexc.trade.post_order(order_params)
  451. exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
  452. if 'orderId' not in exchange_sell_order:
  453. msg = f"交易所现货二卖下单失败 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
  454. logger.error(msg)
  455. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  456. else:
  457. oid = exchange_sell_order['orderId']
  458. # 查询交易所订单状态
  459. waiting_times_inner = 30
  460. last_order = None
  461. while waiting_times_inner > 0:
  462. params = {
  463. "symbol": self.symbol.replace('_', ''),
  464. "orderId": oid
  465. }
  466. order = mexc.trade.get_order(params)
  467. order_formated = pformat(order, indent=2)
  468. last_order = order
  469. if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
  470. money = Decimal(order['cummulativeQuoteQty'])
  471. msg = f"交易所现货二卖订单已完成, money: {money}。\n order: {order_formated}"
  472. logger.info(msg)
  473. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  474. break
  475. else:
  476. time.sleep(1)
  477. waiting_times_inner = waiting_times_inner - 1
  478. if waiting_times_inner <= 0:
  479. last_order_formated = pformat(last_order, indent=2)
  480. msg = f"交易所现货二卖订单失敗, 最後狀態:{last_order_formated}。"
  481. logger.info(msg)
  482. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  483. else:
  484. msg = f"不滿足二賣條件,{diff}*{self.sell_price} = {value}"
  485. logger.info(msg)
  486. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  487. # 計算實際利潤
  488. actual_profit = value
  489. actual_gas_price = Decimal(tx_details['gasPrice'])
  490. actual_gas_price_gwei = actual_gas_price / Decimal('1e9')
  491. actual_gas_price_gwei = actual_gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
  492. actual_gas_used = Decimal(tx_details['gasUsed'])
  493. actual_wei = actual_gas_price * actual_gas_used
  494. actual_eth = actual_wei / Decimal('1e18')
  495. actual_eth = actual_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
  496. actual_fee_used = actual_eth * self.eth_price
  497. actual_fee_used = actual_fee_used.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  498. actual_profit = value - actual_fee_used - self.WITHDRAW_FEE
  499. msg = f"【最終利潤】{actual_profit}{self.base_coin}(已扣除所有手續費、滑點)\
  500. \n鏈上ETH使用: {actual_eth}({actual_fee_used} USD), gas_price: {actual_gas_price_gwei} GWEI, gas_used: {actual_gas_used}\
  501. \n交易所出售代幣利潤: {value}, 提現手續費: {self.WITHDRAW_FEE}\
  502. "
  503. logger.info(msg)
  504. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  505. self._set_state(self.STATE_COMPLETED)
  506. break
  507. # 如果300秒都沒確認成功,該交易大概率沒有上鏈
  508. if waiting_times <= 0:
  509. with self.pending_lock:
  510. response = copy.deepcopy(self.pending_data[chain_tx_hash])
  511. response_formated = pformat(response, indent=2)
  512. msg = f"链上交易确认失败:{chain_tx_hash}\n{response_formated}"
  513. logger.error(msg)
  514. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  515. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  516. except Exception as e:
  517. exc_traceback = traceback.format_exc()
  518. msg = f"查询链上确认状态时发生错误\n{exc_traceback}"
  519. logger.error(msg)
  520. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  521. self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
  522. # traceback.print_exc()
  523. def _wait_exchange_rollback(self):
  524. """
  525. 市价进行交易所交易回滚
  526. """
  527. msg = "执行:中心化交易所买入现货回滚..."
  528. logger.info(msg)
  529. add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
  530. try:
  531. # 使用预提现数量进行回滚
  532. pseudo_amount_to_buy = Decimal(self.sell_value)
  533. # 处理精度
  534. pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(self.base_coin_asset_precision, rounding=ROUND_DOWN)
  535. # 剩余计价币余额
  536. free_balance = Decimal(0)
  537. # 交易所U余额判断
  538. with self.mexc_lock:
  539. balances = self.mexc_data['account_info']['balances']
  540. for balance in balances:
  541. if balance['asset'] == self.base_coin:
  542. free_balance = Decimal(balance['free'])
  543. pseudo_amount_to_buy = min(free_balance, pseudo_amount_to_buy)
  544. if pseudo_amount_to_buy < Decimal('10'):
  545. msg = f"交易所剩余{self.base_coin}: {free_balance}, 小于10, 不能触发回滚交易。"
  546. logger.info(msg)
  547. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  548. self._set_state(self.STATE_FAILED)
  549. return
  550. else:
  551. msg = f"交易所剩余{self.base_coin}: {free_balance}, 交易所准备使用:{pseudo_amount_to_buy}, 余额校验通过(可以回滚)。"
  552. logger.info(msg)
  553. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  554. break
  555. # 有可能会遇到手续费占用问题
  556. ready_to_buy = self.exchange_sell_amount
  557. if self.exchange_sell_amount * self.sell_price * Decimal(1.002) >= free_balance:
  558. ready_to_buy = (free_balance * Decimal(0.998)) / self.sell_price
  559. ready_to_buy = ready_to_buy.quantize(self.coin_asset_precision, rounding=ROUND_DOWN)
  560. # 初始化 quantity 变量
  561. quantity_for_api = None
  562. # 用求余法判断是否是整数
  563. if ready_to_buy % 1 == 0:
  564. # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量
  565. quantity_for_api = int(ready_to_buy)
  566. else:
  567. # 如果是非整数,转换为 float 类型。这是最常见的API数量类型
  568. quantity_for_api = float(ready_to_buy)
  569. order_params = {
  570. "symbol": self.symbol.replace('_', ''),
  571. "side": "BUY",
  572. "price": self.sell_price,
  573. "type": "LIMIT",
  574. "quantity": quantity_for_api,
  575. }
  576. order_params_formated = pformat(order_params, indent=2)
  577. exchange_buy_order = mexc.trade.post_order(order_params)
  578. exchange_buy_order_formated = pformat(exchange_buy_order, indent=2)
  579. if 'orderId' not in exchange_buy_order:
  580. msg = f"【回滚】交易所现货买入下单失败\n params:{order_params_formated}\norder: {exchange_buy_order_formated}"
  581. logger.error(msg)
  582. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  583. self._set_state("FAILED")
  584. return
  585. exchange_buy_order_id = exchange_buy_order['orderId']
  586. msg = f"【回滚】交易所现货买入订单已发送, 订单ID: {exchange_buy_order_id}"
  587. logger.info(msg)
  588. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  589. # 查询交易所订单状态
  590. last_query_rst = None
  591. while True:
  592. params = {
  593. "symbol": self.symbol.replace('_', ''),
  594. "orderId": exchange_buy_order_id
  595. }
  596. order = mexc.trade.get_order(params)
  597. order_formated = pformat(order, indent=2)
  598. last_query_rst = order
  599. if order['status'] == "FILLED":
  600. money = Decimal(order['cummulativeQuoteQty'])
  601. amount = self.exchange_sell_amount
  602. price = money / amount
  603. price = price.quantize(self.price_precision, rounding=ROUND_DOWN)
  604. msg = f"【回滚】交易所现货买入订单已完全成交, 价格:{price}。\norder: {order_formated}"
  605. logger.info(msg)
  606. add_state_flow_entry(self.process_item, self.current_state, msg, "success")
  607. self._set_state(self.STATE_FAILED)
  608. return
  609. else:
  610. # 继续等待成交
  611. pass
  612. time.sleep(1)
  613. last_query_rst_formated = pformat(last_query_rst, indent=2)
  614. msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_buy_order_id}\n最终状态:{last_query_rst_formated}"
  615. logger.info(msg)
  616. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  617. self._set_state(self.STATE_FAILED)
  618. except Exception as e:
  619. exc_traceback = traceback.format_exc()
  620. msg = f"【回滚】交易所回滚交易失败\n{exc_traceback}"
  621. logger.error(msg)
  622. add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
  623. self._set_state(self.STATE_FAILED)
  624. # traceback.print_exc()