import requests import decimal import time import threading import json from flask import Flask, render_template, jsonify from collections import deque # 用于高效地存储固定数量的历史数据 # --- 配置部分 (与之前相同) --- GATEIO_SPOT_PAIR = 'MUBARAK_USDT' IN_TOKEN_ADDRESS_BSC = '0x55d398326f99059ff775485246999027b3197955' OUT_TOKEN_ADDRESS_BSC = '0x5C85D6C6825aB4032337F11Ee92a72DF936b46F6' AMOUNT_TO_QUERY_HUMAN = decimal.Decimal('1000') PROXY_HOST = '127.0.0.1' PROXY_PORT = '7890' proxies = { 'http': f'http://{PROXY_HOST}:{PROXY_PORT}', 'https': f'http://{PROXY_HOST}:{PROXY_PORT}', } # proxies = None # 如果你不需要代理 decimal.getcontext().prec = 36 # --- 价格获取函数 (与之前相同, 省略以保持简洁) --- def get_openocean_price_bsc(in_token_addr, out_token_addr, human_amount_in_decimal_for_request, gas_price=3): # ... (代码与之前相同) chain = 'bsc' url = f'https://open-api.openocean.finance/v4/{chain}/quote' params = { 'inTokenAddress': in_token_addr, 'outTokenAddress': out_token_addr, 'amount': str(human_amount_in_decimal_for_request), 'gasPrice': gas_price, } try: response = requests.get(url, params=params, proxies=proxies, timeout=10) response.raise_for_status() data = response.json() if data.get('code') == 200 and data.get('data'): api_data = data['data'] if not (api_data.get('inToken') and api_data['inToken'].get('decimals') is not None and api_data.get('outToken') and api_data['outToken'].get('decimals') is not None and api_data.get('inAmount') is not None and api_data.get('outAmount') is not None): return {"error": "OO API响应中缺少必要的数据"} in_token_decimals = int(api_data['inToken']['decimals']) out_token_decimals = int(api_data['outToken']['decimals']) atomic_in_amount = decimal.Decimal(api_data['inAmount']) atomic_out_amount = decimal.Decimal(api_data['outAmount']) derived_human_in_amount = atomic_in_amount / (decimal.Decimal('10') ** in_token_decimals) derived_human_out_amount = atomic_out_amount / (decimal.Decimal('10') ** out_token_decimals) if derived_human_out_amount == 0: return {"error": "OO 计算的输出数量为零"} calculated_price = derived_human_in_amount / derived_human_out_amount return {"price_in_per_out": calculated_price} else: error_message = data.get('message', 'N/A') if data else '无响应数据' error_code = data.get('code', 'N/A') if data else 'N/A' return {"error": f"OO API 错误 - Code: {error_code}, Msg: {error_message}"} except requests.exceptions.RequestException as e: return {"error": f"OO请求失败: {e}"} except Exception as e: return {"error": f"OO意外错误: {e}"} def get_gateio_spot_price(pair_symbol): # ... (代码与之前相同) url = f'https://api.gateio.ws/api/v4/spot/tickers' params = {'currency_pair': pair_symbol} try: response = requests.get(url, params=params, proxies=proxies, timeout=10) response.raise_for_status() data = response.json() if isinstance(data, list) and len(data) > 0: ticker_data = data[0] if ticker_data.get('currency_pair') == pair_symbol: last_price_str = ticker_data.get('last') if last_price_str: return {"price_base_in_quote": decimal.Decimal(last_price_str)} else: return {"error": "Gate未找到last price"} else: return {"error": f"Gate交易对不匹配"} else: return {"error": "Gate API数据格式错误"} except requests.exceptions.RequestException as e: return {"error": f"Gate请求失败: {e}"} except Exception as e: return {"error": f"Gate意外错误: {e}"} app = Flask(__name__) # --- 全局数据和历史记录 --- MAX_HISTORY_POINTS = 60 # 图表上显示的最大数据点数量 (例如,过去5分钟,每5秒一个点) latest_data = { "oo_price": None, "gate_price": None, "difference_percentage": None, "oo_error": None, "gate_error": None, "last_updated": None } # 使用 deque 来存储历史数据,它在添加和删除元素时具有 O(1) 的复杂度 historical_prices = deque(maxlen=MAX_HISTORY_POINTS) # 存储 {'timestamp': ts, 'oo': price, 'gate': price} historical_diff = deque(maxlen=MAX_HISTORY_POINTS) # 存储 {'timestamp': ts, 'diff': percentage} data_lock = threading.Lock() def update_prices_periodically(): global latest_data, historical_prices, historical_diff while True: fetch_timestamp_str = time.strftime("%H:%M:%S") # 用于图表标签的简单时间戳 oo_price_data = get_openocean_price_bsc( IN_TOKEN_ADDRESS_BSC, OUT_TOKEN_ADDRESS_BSC, AMOUNT_TO_QUERY_HUMAN ) gate_price_data = get_gateio_spot_price(GATEIO_SPOT_PAIR) current_oo_price_decimal = None current_gate_price_decimal = None current_oo_error = None current_gate_error = None diff_percentage_val = None diff_percentage_str = "N/A" if "error" not in oo_price_data: current_oo_price_decimal = oo_price_data['price_in_per_out'] else: current_oo_error = oo_price_data['error'] if "error" not in gate_price_data: current_gate_price_decimal = gate_price_data['price_base_in_quote'] else: current_gate_error = gate_price_data['error'] # 只有当两个价格都有效时,才计算价差并记录历史 if current_oo_price_decimal is not None and current_gate_price_decimal is not None: # 将Decimal转换为浮点数以便存储和图表绘制 (Chart.js 通常使用JS number) oo_price_float = float(current_oo_price_decimal) gate_price_float = float(current_gate_price_decimal) historical_prices.append({ "timestamp": fetch_timestamp_str, "oo": oo_price_float, "gate": gate_price_float }) if current_gate_price_decimal > 0: difference = current_oo_price_decimal - current_gate_price_decimal percentage_diff_decimal = (difference / current_gate_price_decimal) * 100 diff_percentage_val = float(percentage_diff_decimal) # 用于图表 diff_percentage_str = f"{percentage_diff_decimal:+.4f}%" # 用于显示 historical_diff.append({ "timestamp": fetch_timestamp_str, "diff": diff_percentage_val }) else: diff_percentage_str = "Gate价格为0" # 对于价差图表,如果gate价格为0,可以记录一个特殊值或跳过 historical_diff.append({ "timestamp": fetch_timestamp_str, "diff": None # 或者一个非常大/小的值来表示无效 }) else: # 如果其中一个价格无效,我们仍然可以尝试记录有效的那个,或者都标记为None # 这里我们选择如果任一价格无效,价格历史点也记录None,价差历史点也记录None historical_prices.append({ "timestamp": fetch_timestamp_str, "oo": float(current_oo_price_decimal) if current_oo_price_decimal else None, "gate": float(current_gate_price_decimal) if current_gate_price_decimal else None }) historical_diff.append({ "timestamp": fetch_timestamp_str, "diff": None }) with data_lock: latest_data["oo_price"] = str(current_oo_price_decimal) if current_oo_price_decimal is not None else None latest_data["gate_price"] = str( current_gate_price_decimal) if current_gate_price_decimal is not None else None latest_data["difference_percentage"] = diff_percentage_str latest_data["oo_error"] = current_oo_error latest_data["gate_error"] = current_gate_error latest_data["last_updated"] = time.strftime("%Y-%m-%d %H:%M:%S") log_oo = f"{current_oo_price_decimal:.6f}" if current_oo_price_decimal else "N/A" log_gate = f"{current_gate_price_decimal:.6f}" if current_gate_price_decimal else "N/A" print( f"Data updated at {fetch_timestamp_str}: OO: {log_oo}, Gate: {log_gate}, Diff: {diff_percentage_str}, Errors: OO: {current_oo_error}, Gate: {current_gate_error}") time.sleep(5) # 更新频率 @app.route('/') def index(): return render_template('index.html') @app.route('/data') def get_data(): with data_lock: # 准备图表需要的数据 # 对于价格图表 price_chart_labels = [item['timestamp'] for item in historical_prices] oo_price_values = [item['oo'] for item in historical_prices] gate_price_values = [item['gate'] for item in historical_prices] # 对于价差图表 diff_chart_labels = [item['timestamp'] for item in historical_diff] diff_values = [item['diff'] for item in historical_diff] # 将最新的数据点加入 (如果历史记录已满,deque会自动移除旧的) # 这里的数据是 latest_data 和 historical_data 的组合 # Chart.js需要完整的历史数据来初始化,然后可以增量更新 # 或者,前端每次都获取完整的历史窗口 # 为了简单起见,前端将定期获取完整的 /data,其中包含历史和当前 response_data = { "current": latest_data, "history": { "prices": { "labels": list(price_chart_labels), # deque 不是直接 JSON 可序列化的 "oo": list(oo_price_values), "gate": list(gate_price_values) }, "difference": { "labels": list(diff_chart_labels), "values": list(diff_values) } } } return jsonify(response_data) if __name__ == "__main__": price_updater_thread = threading.Thread(target=update_prices_periodically, daemon=True) price_updater_thread.start() app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)