as.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. # -*- coding: utf-8 -*-
  2. from checker.logger_config import get_logger
  3. logger = get_logger('as')
  4. logger.info('\n\n----------------------------------------------as啓動--------------------------------------------')
  5. import threading
  6. import uuid # 用于生成唯一的流程ID
  7. import time
  8. import logging
  9. import s_erc20_to_mexc
  10. import s_mexc_to_erc20
  11. import web3_py_client
  12. import traceback
  13. import copy
  14. import sys
  15. from decimal import Decimal, ROUND_DOWN
  16. from flask import Flask, request, jsonify
  17. from flask_cors import CORS # 导入
  18. from as_utils import get_formatted_timestamp
  19. from as_utils import add_state_flow_entry
  20. from config import wallet
  21. from config import withdraw
  22. from binance.client import Client # 用于获取ETH价格
  23. from checker import ok_chain_client
  24. from mexc_client import MexcClient
  25. from pprint import pprint
  26. from pprint import pformat
  27. ok_chain_client.api_config = {
  28. "api_key": 'a05643ab-fb17-402b-94a8-a886bd343301', # 请替换为您的真实 API Key
  29. "secret_key": '9D59B53EB1E60B1B5F290D3698A8C9DA', # 请替换为您的真实 Secret Key
  30. "passphrase": 'Qwe123123.', # 请替换为您的真实 Passphrase
  31. }
  32. # 配置日志
  33. log = logging.getLogger('werkzeug')
  34. log.setLevel(logging.ERROR)
  35. web3 = web3_py_client.EthClient()
  36. w3 = web3.w3
  37. mexc = MexcClient()
  38. USER_WALLET = wallet["user_wallet"]
  39. USER_EXCHANGE_WALLET = wallet["user_exchange_wallet"]
  40. # 该代币最后一次执行套利的区块信息 (如果需要防止过于频繁的同类套利,不然变成砸盘、拉盘的了)
  41. last_process_info = {} # 示例: {"RATO_USDT": 0}
  42. MIN_BLOCKS_BETWEEN_ARB = Decimal(5) # 在重试相同交易对之前等待几个区块
  43. # --- 全局状态和锁 ---
  44. processing_list = [] # 正在处理的任务列表
  45. history_process_list = [] # 已完成的任务历史列表
  46. list_lock = threading.Lock() # 用于修改 processing_list 和 history_process_list 结构的锁
  47. # --- 一些核心數據和鎖 ---
  48. core_data = {
  49. "nonce": 0, # 全局 Nonce
  50. "eth_balance": Decimal(0), # 全局 eth餘額
  51. "eth_price": 0, # 全局 eth價格
  52. "block_number": 0, # 全局 區塊號
  53. "block": None, # 全局 最后一個區塊的信息
  54. }
  55. core_lock = threading.Lock() # 核心數據的锁
  56. # --- pending數據和鎖 ---
  57. pending_data = {
  58. # # 數據結構的demo
  59. # "0xaf181bbbf5bf56d9204bd18cd25abd90e51890e848525e7788b410689c0c26a4": {
  60. # "block_number": 22570370, # 提交pending時的區塊,隔幾個區塊去獲取數據會更準確
  61. # "tx_details": None, # okapi解析的數據, None就是還沒有獲取到
  62. # "reponse": None, # okapi最後一次獲取到的數據
  63. # },
  64. }
  65. pending_lock = threading.Lock()
  66. PENDING_CONFIRM_BLOCK = 3 # 需要幾個區塊才進行確認
  67. # --- mexc相關數據和鎖 ---
  68. mexc_data = {
  69. "account_info": {},
  70. "deposit_list": [],
  71. "withdraw_list": [],
  72. "coin_info_map": {}, # 處理過的幣種信息,coin_info_map[coin][network]
  73. }
  74. mexc_lock = threading.Lock()
  75. CHAIN_ID = -1
  76. try:
  77. if w3.provider:
  78. CHAIN_ID = w3.eth.chain_id
  79. logger.info(f"Web3 已连接。chain_id={CHAIN_ID}")
  80. else:
  81. logger.info("Web3 未连接。")
  82. except Exception as e:
  83. logger.info(f"初始化 {USER_WALLET} 的全局 nonce 时出错: {e}")
  84. # Binance 客户端 (无需API Key/Secret即可获取公开行情数据)
  85. try:
  86. binance_client = Client()
  87. # 测试连接 (可选,但建议)
  88. binance_client.ping()
  89. logger.info("成功连接到 Binance API。")
  90. except Exception as e:
  91. logger.error(f"初始化 Binance Client 时发生错误: {e}")
  92. binance_client = None
  93. # --- Flask 应用 ---
  94. app = Flask(__name__)
  95. CORS(app) # 在创建 app 实例后启用 CORS
  96. WITHDRAWAL_COOLDOWN = 180 # 秒,成功提现后冷却60秒
  97. def move_completed_process_to_history(process_id_to_move: str) -> bool:
  98. """
  99. 将一个完成的 process_item 从 processing_list 移动到 history_process_list。
  100. 此操作是线程安全的。
  101. Args:
  102. process_id_to_move (str): 要移动的 process_item 的 ID。
  103. Returns:
  104. bool: 如果成功找到并移动了 item,则返回 True,否则返回 False。
  105. """
  106. global processing_list, history_process_list # 因为我们要修改这两个列表
  107. item_to_move = None
  108. moved_successfully = False
  109. with list_lock:
  110. # 查找并从 processing_list 中移除
  111. found_index = -1
  112. for i, item in enumerate(processing_list):
  113. if item.get('id') == process_id_to_move:
  114. found_index = i
  115. break
  116. if found_index != -1:
  117. item_to_move = processing_list.pop(found_index) # 从 processing_list 中移除并获取它
  118. # 假设在 item_to_move 中,其 currentState 已经被 arbitrage_process_flow 更新为 COMPLETED 或 FAILED
  119. # arbitrage_process.add_state_flow_entry(item_to_move, "MOVED_TO_HISTORY", f"流程处理完毕,移至历史记录。最终状态: {item_to_move.get('currentState', 'N/A')}", "info")
  120. history_process_list.append(item_to_move) # 添加到 history_process_list
  121. logger.info(f"已将 process_id: {process_id_to_move} 从 processing_list 移动到 history_process_list。")
  122. moved_successfully = True
  123. else:
  124. logger.warning(f"尝试移动到 history_list 时,在 processing_list 中未找到 process_id: {process_id_to_move}")
  125. return moved_successfully
  126. # 策略構建器
  127. def strategy_builder(process_item):
  128. profit = Decimal(process_item['profit'])
  129. profitLimit = Decimal(process_item['profitLimit'])
  130. strategy = process_item['strategy']
  131. # 對於高利潤交易,進行適當加速
  132. gas_limit_multiplier = 1
  133. gas_price_multiplier = 1
  134. # if profit > Decimal(5) * profitLimit:
  135. # gas_price_multiplier = 5
  136. # elif profit > Decimal(10) * profitLimit:
  137. # gas_price_multiplier = 10
  138. global core_data
  139. global core_lock
  140. global pending_data
  141. global pending_lock
  142. global mexc_data
  143. global mexc_lock
  144. process_item_formated = pformat(process_item, indent=2)
  145. logger.info(f'策略原始参数:\n{process_item_formated}')
  146. if strategy == 'erc20_to_mexc':
  147. return s_erc20_to_mexc.ArbitrageProcess(gas_limit_multiplier, gas_price_multiplier, process_item,
  148. core_data, core_lock,
  149. pending_data, pending_lock,
  150. mexc_data, mexc_lock
  151. )
  152. elif strategy == 'mexc_to_erc20':
  153. return s_mexc_to_erc20.ArbitrageProcess(gas_limit_multiplier, gas_price_multiplier, process_item,
  154. core_data, core_lock,
  155. pending_data, pending_lock,
  156. mexc_data, mexc_lock
  157. )
  158. else:
  159. logger.error(f'不存在的策略:{strategy}')
  160. # 實際套利邏輯
  161. def arbitrage_process_flow(process_item):
  162. """
  163. 在单独线程中执行的实际套利逻辑。
  164. 会直接修改 'process_item' 字典。
  165. """
  166. process_id = process_item['id']
  167. ap = strategy_builder(process_item)
  168. # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
  169. ap._set_state(ap.STATE_CHECK)
  170. # 在主循环中周期性调用 run_arbitrage_step
  171. while ap.current_state != ap.STATE_COMPLETED and ap.current_state != ap.STATE_FAILED and ap.current_state != ap.STATE_REJECT:
  172. ap.run_arbitrage_step()
  173. ap.run_arbitrage_step()
  174. move_completed_process_to_history(process_id)
  175. # --- 核心數據更新綫程函數 ---
  176. def update_core_data_periodically():
  177. """
  178. 周期性更新 nonce 和 ETH 价格的线程函数。
  179. """
  180. global core_data # 明确表示我们要修改全局的 core_data
  181. if not USER_WALLET or USER_WALLET == "你的钱包地址":
  182. logger.error("USER_WALLET 未正确配置。nonce 更新将无法进行。")
  183. # 如果 USER_WALLET 未配置,可以考虑让线程不执行 nonce 更新,或者直接退出
  184. # 这里我们选择继续运行,但 nonce 不会被更新
  185. while True:
  186. try:
  187. new_eth_price = None
  188. new_eth_balance = None
  189. new_nonce = None
  190. new_block_number = None
  191. new_block = None
  192. # 1. 从 Binance 获取 ETH 价格
  193. if binance_client:
  194. try:
  195. ticker = binance_client.get_symbol_ticker(symbol="ETHUSDT")
  196. new_eth_price = float(ticker['price'])
  197. except Exception as e:
  198. logger.error(f"从 Binance 获取 ETH 价格失败: {e}")
  199. else:
  200. logger.warning("Binance client 未初始化, 无法获取 ETH 价格。")
  201. # 2. 获取最新的 Nonce 和 最新的block_number 以及 最新的賬戶eth餘額
  202. # 确保 w3 已初始化且 USER_WALLET 已配置
  203. if w3 and w3.is_connected() and USER_WALLET and USER_WALLET != "你的钱包地址":
  204. try:
  205. new_block = w3.eth.get_block('latest')
  206. new_block_number = new_block['number']
  207. new_nonce = w3.eth.get_transaction_count(USER_WALLET, 'latest')
  208. eth_balance_origin = w3.eth.get_balance(USER_WALLET)
  209. new_eth_balance = Decimal(eth_balance_origin / (10 ** 18))
  210. new_eth_balance = new_eth_balance.quantize(Decimal('1e-6'), rounding=ROUND_DOWN)
  211. except Exception as e:
  212. logger.error(f"为 {USER_WALLET} 获取 Nonce、BlockNumber、EthBalances 失败: {e}")
  213. elif not (w3 and w3.is_connected()):
  214. logger.warning("Web3 未连接, 无法获取 nonce。")
  215. elif not (USER_WALLET and USER_WALLET != "你的钱包地址"):
  216. logger.warning("USER_WALLET 配置不正确, 无法获取 nonce。")
  217. # 3. 更新共享数据 core_data (使用锁)
  218. # 只有当获取到新数据时才更新,避免不必要的写操作和日志
  219. with core_lock:
  220. if new_eth_price is not None and core_data["eth_price"] != new_eth_price:
  221. eth_price = Decimal(new_eth_price)
  222. eth_price = eth_price.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  223. core_data["eth_price"] = eth_price
  224. # 判斷block_number是否發生變化(升高)
  225. if new_block_number is not None and new_block_number > core_data["block_number"]:
  226. core_data["block_number"] = new_block_number
  227. core_data["block"] = new_block
  228. # 區塊變了才刷新nonce,否則還是要靠本地的緩存維護
  229. if new_nonce is not None and core_data["nonce"] != new_nonce:
  230. core_data["nonce"] = new_nonce
  231. # 餘額也同理
  232. if new_eth_balance is not None and core_data["eth_balance"] != new_eth_balance:
  233. core_data["eth_balance"] = new_eth_balance
  234. # logger.info(f"核心数据已更新: ETH Price = {core_data['eth_price']}, Nonce ({USER_WALLET}) = {core_data['nonce']}, EthBalance={core_data['eth_balance']}, BlockNumber = {core_data['block_number']}")
  235. except Exception as e:
  236. # 捕获线程循环中的其他潜在错误
  237. exc_traceback = traceback.format_exc()
  238. logger.error(f"数据更新线程发生未知错误\n{exc_traceback}")
  239. # traceback.print_exc()
  240. # 等待 500ms
  241. time.sleep(0.5)
  242. # --- mexc數據更新綫程函數 ---
  243. def update_mexc_data_periodically():
  244. """
  245. 周期性更新 mexc的相關數據 的线程函数。
  246. """
  247. global mexc_data
  248. # 每60秒獲取一次coin_info
  249. coin_info_get_delay = 60
  250. while True:
  251. try:
  252. new_account_info = None
  253. new_withdraw_list = None
  254. new_deposit_list = None
  255. new_coin_info_list = None
  256. # 1. new_account_info
  257. try:
  258. new_account_info = mexc.trade.get_account_info()
  259. if 'balances' not in new_account_info:
  260. raise Exception("未找到balances")
  261. with mexc_lock:
  262. mexc_data['account_info'] = new_account_info
  263. # logger.info(f'account_info: {new_account_info['balances']}')
  264. except Exception as e:
  265. logger.error(f"从 Mexc 获取 Balance 失败: {e}, {new_account_info}")
  266. # 2. new_deposit_list
  267. try:
  268. new_deposit_list = mexc.wallet.get_deposit_list()
  269. if not isinstance(new_deposit_list, list):
  270. raise Exception("充值信息獲取錯誤")
  271. with mexc_lock:
  272. mexc_data['deposit_list'] = new_deposit_list
  273. # logger.info(f'deposit_list: {new_deposit_list[0]}')
  274. except Exception as e:
  275. logger.error(f"从 Mexc 获取 deposit_list 失败: {e}, {new_deposit_list}")
  276. # 3. new_withdraw_list
  277. try:
  278. new_withdraw_list = mexc.wallet.get_withdraw_list()
  279. if not isinstance(new_withdraw_list, list):
  280. raise Exception("提現信息獲取錯誤")
  281. with mexc_lock:
  282. mexc_data['withdraw_list'] = new_withdraw_list
  283. # logger.info(f'withdraw_list: {new_withdraw_list[0]}')
  284. except Exception as e:
  285. logger.error(f"从 Mexc 获取 withdraw_list 失败: {e}, {new_withdraw_list}")
  286. # 4. new_coin_info list
  287. try:
  288. if coin_info_get_delay >= 60:
  289. coin_info_get_delay = 0
  290. new_coin_info_list = mexc.wallet.get_coinlist()
  291. if not isinstance(new_coin_info_list, list):
  292. raise Exception("幣種信息獲取錯誤")
  293. # 處理幣種信息
  294. new_coin_info_map = {}
  295. for coin_info in new_coin_info_list:
  296. new_coin_info_map[coin_info['coin']] = {}
  297. for network in coin_info['networkList']:
  298. new_coin_info_map[coin_info['coin']][network['netWork']] = network
  299. with mexc_lock:
  300. mexc_data['coin_info_map'] = new_coin_info_map
  301. # logger.info(f'coin_info_map: {new_coin_info_map['USDT']}')
  302. except Exception as e:
  303. logger.error(f"从 Mexc 获取 withdraw_list 失败: {e}, {new_withdraw_list}")
  304. except Exception as e:
  305. # 捕获线程循环中的其他潜在错误
  306. exc_traceback = traceback.format_exc()
  307. logger.error(f"数据更新线程发生未知错误\n{exc_traceback}")
  308. # traceback.print_exc()
  309. # 幣種信息處理的delay
  310. coin_info_get_delay = coin_info_get_delay + 1
  311. # 等待 1s
  312. time.sleep(1)
  313. # --- tx pending數據獲取綫程函數 ---
  314. def update_tx_data_periodically():
  315. """
  316. 每一秒獲取一條tx數據
  317. """
  318. global pending_data # 明确表示我们要修改全局的 pending_data
  319. while True:
  320. # 等待1s
  321. time.sleep(1)
  322. try:
  323. # 使用拷貝后的數據,否則可能會出現綫程問題
  324. with pending_lock:
  325. pending_data_copy = copy.deepcopy(pending_data)
  326. # 核心數據同理
  327. with core_lock:
  328. core_data_copy = copy.deepcopy(core_data)
  329. block_number = core_data_copy['block_number']
  330. for tx in pending_data_copy:
  331. try:
  332. # 已獲取的就不要再獲取了
  333. if pending_data_copy[tx]['tx_details'] is not None:
  334. continue
  335. # PENDING_CONFIRM_BLOCK個區塊之後的才進行確認,防止回滾頻繁觸發
  336. if block_number < pending_data_copy[tx]['block_number'] + PENDING_CONFIRM_BLOCK:
  337. continue
  338. # 調用ok的api,直接獲取詳細交易
  339. ok_rst = ok_chain_client.history(CHAIN_ID, tx)
  340. # 存儲最後一次獲取的細節
  341. with pending_lock:
  342. pending_data[tx]['response'] = ok_rst
  343. # 錯誤響應
  344. if ok_rst['code'] != '0':
  345. raise RuntimeError("API 返回错误响应", ok_rst)
  346. # ok不一定那麽快獲取到
  347. if ok_rst['data'] is None:
  348. # 每一個之間等待1s
  349. time.sleep(1)
  350. continue
  351. details = ok_rst['data']
  352. status = details['status']
  353. if status != 'fail':
  354. # 有時候不會馬上識別出成交數量
  355. if 'fromTokenDetails' not in details or 'toTokenDetails' not in details:
  356. # 每一個之間等待1s
  357. time.sleep(1)
  358. continue
  359. # 有時候不會馬上識別出成交數量 判斷2
  360. if details['fromTokenDetails'] is None or details['toTokenDetails'] is None:
  361. # 每一個之間等待1s
  362. time.sleep(1)
  363. continue
  364. # 有時候不會馬上識別出gas信息之類的
  365. fileds = ['gasLimit', 'gasPrice', 'gasUsed', 'height']
  366. insufficient = False
  367. for filed in fileds:
  368. if details[filed] == '':
  369. insufficient = True
  370. break
  371. if insufficient:
  372. # 每一個之間等待1s
  373. time.sleep(1)
  374. continue
  375. # 成功獲取之後直接調用更新
  376. with pending_lock:
  377. pending_data[tx]['tx_details'] = details
  378. formated_data = pformat(ok_rst['data'], indent=2) # indent=2 让格式更整齐
  379. logger.info(f"獲取成功: \n{formated_data}")
  380. except Exception as e:
  381. exc_traceback = traceback.format_exc()
  382. logger.error(f"tx數據獲取失敗\n{exc_traceback}")
  383. # traceback.print_exc()
  384. # 每一個之間等待1s
  385. time.sleep(1)
  386. except Exception as e:
  387. exc_traceback = traceback.format_exc()
  388. logger.error(f"pending更新线程发生未知错误\n{exc_traceback}")
  389. # traceback.print_exc()
  390. # --- 餘額平衡綫程 ---
  391. def balance_available_funds_periodically():
  392. """
  393. 每10秒嘗試平衡一次餘額
  394. """
  395. PROPORTION_LIMIT = Decimal(withdraw['proportion_limit']) # 鏈上資金比例低於這個值就會觸發平衡
  396. PROPORTION_TARGET = Decimal(withdraw['proportion_target']) # 鏈上資金占比目標,1表示100%是鏈上資金
  397. BASE_COIN = 'USDT'
  398. BASE_COIN_ADDR = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
  399. CANT_WITHDRAW_STATE_LIST = ['IDLE',
  400. 'CHECK',
  401. 'SELLING_ON_EXCHANGE',
  402. 'WAITING_SELL_CONFIRM',
  403. "BUYING_ON_CHAIN",
  404. "WAITING_CHAIN_CONFIRM",
  405. "WAITING_EXCHANGE_ROLLBACK"
  406. ]
  407. global processing_list
  408. while True:
  409. time.sleep(10)
  410. try:
  411. mexc_available = Decimal(0)
  412. # 交易所餘額讀取
  413. new_account_info = mexc.trade.get_account_info()
  414. balances = new_account_info['balances']
  415. for balance in balances:
  416. if balance['asset'].upper() == BASE_COIN:
  417. mexc_available = Decimal(balance['free'])
  418. mexc_available = mexc_available.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  419. # 鏈上餘額讀取
  420. chain_available = web3.get_erc20_balance(BASE_COIN_ADDR)
  421. chain_available = chain_available.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
  422. # 縂可用餘額(不包括lock的)
  423. total_available = mexc_available + chain_available
  424. # 小於20都懶得做平衡,手續費都不夠
  425. if total_available < Decimal(20):
  426. continue
  427. # 抹茶餘額也要大於20
  428. if mexc_available < Decimal(20):
  429. continue
  430. # 計算鏈上資金佔總體的比例
  431. proportion = chain_available / total_available
  432. proportion = proportion.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
  433. # 判斷比例是否滿足limit,不滿足則先不提現
  434. if proportion > PROPORTION_LIMIT:
  435. continue
  436. # 鏈上應該具備的資金量
  437. chain_available_target = total_available * PROPORTION_TARGET
  438. mexc_should_be_withdrawal_founds = chain_available_target - chain_available
  439. mexc_should_be_withdrawal_founds = mexc_should_be_withdrawal_founds.quantize(Decimal(1), rounding=ROUND_DOWN)
  440. # 如若當前綫程中有未執行完的,先不執行提現
  441. with list_lock:
  442. cant_withdraw = False
  443. for processing in processing_list:
  444. if processing['currentState'] in CANT_WITHDRAW_STATE_LIST:
  445. cant_withdraw = True
  446. break
  447. # 不執行提現判斷
  448. if cant_withdraw:
  449. # formated_processing = pformat(processing_list, indent=2)
  450. # logger.info(f"不執行提現, 因爲: \n{formated_processing}")
  451. continue
  452. if mexc_should_be_withdrawal_founds > 0:
  453. withdrawal_params = {
  454. 'coin': 'USDT',
  455. 'netWork': 'ETH',
  456. 'address': USER_WALLET,
  457. 'amount': mexc_should_be_withdrawal_founds,
  458. }
  459. withdrawal_params_formated = pformat(withdrawal_params, indent=2)
  460. withdrawal_rst = mexc.wallet.post_withdraw(withdrawal_params)
  461. withdrawal_rst_formated = pformat(withdrawal_rst, indent=2)
  462. logger.info(f"[withdrawal]mexc_available={mexc_available}, chain_available={chain_available},proportion={proportion}, mexc_withdrawal={mexc_should_be_withdrawal_founds}")
  463. if "id" not in withdrawal_rst:
  464. msg = f"[withdrawal]交易所提现失败\n參數: {withdrawal_params_formated}\n響應: {withdrawal_rst_formated}"
  465. logger.error(msg)
  466. else:
  467. msg = f"[withdrawal]交易所提现已发送, 进入 {WITHDRAWAL_COOLDOWN} 秒冷却期。\n參數: {withdrawal_params_formated}\n響應: {withdrawal_rst_formated}"
  468. logger.info(msg)
  469. else:
  470. # TODO 這是另一個方向,需要從鏈上往交易所劃轉
  471. pass
  472. except Exception as e:
  473. exc_traceback = traceback.format_exc()
  474. logger.error(f"可用資金平衡綫程发生未知错误\n{exc_traceback}")
  475. # traceback.print_exc()
  476. @app.route('/submit_process', methods=['POST'])
  477. def handle_submit_process():
  478. data = request.get_json()
  479. if not data:
  480. return jsonify({"error": "无效的 JSON 请求体"}), 400
  481. required_fields = ['tx', 'profit', 'profitLimit', 'symbol', 'strategy']
  482. for field in required_fields:
  483. if field not in data:
  484. return jsonify({"error": f"缺少字段: {field}, keys: {data.keys()}"}), 400
  485. try:
  486. profit = Decimal(str(data['profit'])) # 利润
  487. profit_limit = Decimal(str(data['profitLimit'])) # 利润阈值
  488. except (decimal.InvalidOperation, ValueError) as e:
  489. return jsonify({"error": f"请求体中包含无效的小数/整数值: {e}"}), 400
  490. symbol = data['symbol'] # 交易对符号
  491. # 检查此交易对此区块是否处理过
  492. last_trade_block = last_process_info.get(symbol)
  493. with core_lock:
  494. current_block = core_data['block_number']
  495. if last_trade_block:
  496. if current_block - last_trade_block < MIN_BLOCKS_BETWEEN_ARB:
  497. return jsonify({"message": f"已跳过: {symbol} 最近已处理 (区块 {last_trade_block}). 当前区块 {current_block}."}), 200
  498. if profit >= profit_limit:
  499. process_id = str(uuid.uuid4()) # 生成唯一流程ID
  500. process_item = copy.deepcopy(data)
  501. process_item['id'] = process_id
  502. process_item['creationTime'] = get_formatted_timestamp(), # 创建时间
  503. process_item['userWallet'] = USER_WALLET
  504. process_item['userExchangeWallet'] = USER_EXCHANGE_WALLET
  505. process_item['stateFlow'] = [] # 状态流转记录
  506. process_item['currentState'] = "PENDING_START"
  507. # 初始状态更新
  508. add_state_flow_entry(process_item, "RECEIVED", f"流程已接收。利润 {profit} >= 利润阈值 {profit_limit}。开始套利。", "success")
  509. with list_lock:
  510. processing_list.append(process_item)
  511. last_process_info[symbol] = current_block
  512. logger.info(f"已更新 {symbol} 的最后处理信息至区块 {current_block}")
  513. # 在新线程中开始套利过程
  514. arb_thread = threading.Thread(target=arbitrage_process_flow, args=(process_item,), daemon=True)
  515. arb_thread.start()
  516. return jsonify({"message": "套利过程已启动", "process_id": process_id}), 201
  517. else:
  518. return jsonify({"message": f"利润 {profit} 小于利润阈值 {profit_limit}。不处理。"}), 200
  519. @app.route('/processing', methods=['GET'])
  520. def get_processing_list():
  521. """获取正在处理的任务列表"""
  522. with list_lock:
  523. # 返回一个副本,以避免在迭代生成 JSON 响应时列表被修改的问题
  524. return jsonify(list(processing_list))
  525. @app.route('/history', methods=['GET'])
  526. def get_history_list():
  527. """获取已完成的任务历史列表"""
  528. with list_lock:
  529. return jsonify(list(history_process_list))
  530. @app.route('/status', methods=['GET'])
  531. def get_status():
  532. """获取系统状态概览"""
  533. with list_lock:
  534. return jsonify({
  535. "processing_count": len(processing_list), # 正在处理的任务数量
  536. "history_count": len(history_process_list), # 历史任务数量
  537. # "current_nonce_USER_WALLET_if_managed_here": global_nonce_USER_WALLET, # 示例:如果服务器管理此nonce
  538. "last_process_info": last_process_info # 最后处理信息 (如果使用)
  539. })
  540. if __name__ == "__main__":
  541. logger.info("启动核心数据更新线程...")
  542. updater_thread = threading.Thread(target=update_core_data_periodically, daemon=True)
  543. updater_thread.start()
  544. logger.info("启动抹茶数据更新线程...")
  545. updater_thread = threading.Thread(target=update_mexc_data_periodically, daemon=True)
  546. updater_thread.start()
  547. logger.info("启动pending信息獲取线程...")
  548. pending_thread = threading.Thread(target=update_tx_data_periodically, daemon=True)
  549. pending_thread.start()
  550. logger.info("启动餘額平衡线程...")
  551. pending_thread = threading.Thread(target=balance_available_funds_periodically, daemon=True)
  552. pending_thread.start()
  553. logger.info("主线程继续执行,可以执行其他任务或保持运行以观察数据更新。")
  554. logger.info("启动 Flask 套利执行服务器...")
  555. app.run(host='0.0.0.0', port=188, debug=False) # 使用与 price_checker 不同的端口