import requests import decimal import time import threading import json from flask import Flask, render_template, jsonify from collections import deque import logging # --- 配置部分 --- # OpenOcean IN_TOKEN_ADDRESS_BSC = '0x55d398326f99059ff775485246999027b3197955' # USDT on BSC OUT_TOKEN_ADDRESS_BSC = '0x8F0528cE5eF7B51152A59745bEfDD91D97091d2F' AMOUNT_TO_QUERY_HUMAN = decimal.Decimal('1000') # 查询的USDT数量 # Gate.io 现货交易对 GATEIO_SPOT_PAIR = 'ALPACA_USDT' # 示例, 请确保存在且与OUT_TOKEN_ADDRESS_BSC对应 # Gate.io USDT 结算永续合约名称 GATEIO_FUTURES_CONTRACT = 'ALPACA_USDT' # 示例, 请确保存在且与OUT_TOKEN_ADDRESS_BSC对应 # 代理配置 proxies = None # PROXY_HOST = '127.0.0.1' # PROXY_PORT = '7890' # proxies = { # 'http': f'http://{PROXY_HOST}:{PROXY_PORT}', # 'https': f'http://{PROXY_HOST}:{PROXY_PORT}', # } decimal.getcontext().prec = 36 # --- 价格获取函数 --- def get_openocean_price_bsc(in_token_addr, out_token_addr, human_amount_in_decimal_for_request, gas_price=3): # (代码与之前相同, 返回 {"price_usdt_per_out_token": calculated_price} 或 {"error": ...}) chain = 'bsc' url = f'https://open-api.openocean.finance/v4/{chain}/quote' params = { 'inTokenAddress': in_token_addr, 'outTokenAddress': out_token_addr, 'amount': str(human_amount_in_decimal_for_request), 'gasPrice': gas_price, } try: response = requests.get(url, params=params, proxies=proxies, timeout=10) response.raise_for_status() data = response.json() if data.get('code') == 200 and data.get('data'): api_data = data['data'] required_keys = ['inToken', 'outToken', 'inAmount', 'outAmount'] if not all(key in api_data for key in required_keys) or \ api_data['inToken'].get('decimals') is None or \ api_data['outToken'].get('decimals') is None: return {"error": "OO API响应中缺少必要的数据"} in_token_decimals = int(api_data['inToken']['decimals']) out_token_decimals = int(api_data['outToken']['decimals']) atomic_in_amount = decimal.Decimal(api_data['inAmount']) atomic_out_amount = decimal.Decimal(api_data['outAmount']) human_in_amount_used = atomic_in_amount / (decimal.Decimal('10') ** in_token_decimals) human_out_amount_received = atomic_out_amount / (decimal.Decimal('10') ** out_token_decimals) if human_out_amount_received == 0: return {"error": "OO 计算的输出数量为零"} calculated_price = human_in_amount_used / human_out_amount_received return {"price_usdt_per_out_token": calculated_price} else: error_message = data.get('message', 'N/A') if data else '无响应数据' error_code = data.get('code', 'N/A') if data else 'N/A' return {"error": f"OO API 错误 - Code: {error_code}, Msg: {error_message}"} except requests.exceptions.RequestException as e: return {"error": f"OO请求失败: {e}"} except Exception as e: return {"error": f"OO意外错误: {e}"} def get_gateio_spot_price(pair_symbol): # (代码与之前相同, 返回 {"price_base_in_quote": decimal.Decimal(last_price_str)} 或 {"error": ...}) url = f'https://api.gateio.ws/api/v4/spot/tickers' params = {'currency_pair': pair_symbol} try: response = requests.get(url, params=params, proxies=proxies, timeout=10) response.raise_for_status() data = response.json() if isinstance(data, list) and len(data) > 0: ticker_data = data[0] if ticker_data.get('currency_pair') == pair_symbol: last_price_str = ticker_data.get('last') if last_price_str: return {"price_base_in_quote": decimal.Decimal(last_price_str)} else: return {"error": "Gate现货未找到last price"} else: return {"error": f"Gate现货交易对不匹配"} else: error_msg = "未知错误或交易对不存在" if isinstance(data, dict) and data.get('label'): error_msg = f"{data.get('label')}: {data.get('message', '')}" return {"error": f"Gate现货 API 数据格式错误或未找到交易对 {pair_symbol}. Msg: {error_msg}"} except requests.exceptions.RequestException as e: return {"error": f"Gate现货请求失败: {e}"} except Exception as e: return {"error": f"Gate现货意外错误: {e}"} def get_gateio_futures_price(contract_symbol, settle_currency='usdt'): # (代码与之前相同, 返回 {"price_settle_per_base_asset": decimal.Decimal(last_price_str)} 或 {"error": ...}) url = f'https://api.gateio.ws/api/v4/futures/{settle_currency}/tickers' params = {'contract': contract_symbol} try: response = requests.get(url, params=params, proxies=proxies, timeout=10) response.raise_for_status() data = response.json() if isinstance(data, list) and len(data) > 0: ticker_data = data[0] if ticker_data.get('contract') == contract_symbol: last_price_str = ticker_data.get('last') if last_price_str: return {"price_settle_per_base_asset": decimal.Decimal(last_price_str)} else: return {"error": f"Gate期货 ({contract_symbol}) 未找到 'last' price"} else: return {"error": f"Gate期货 API 返回合约与请求 ({contract_symbol}) 不匹配"} else: error_msg = "未知错误或合约不存在" if isinstance(data, dict) and data.get('label'): error_msg = f"{data.get('label')}: {data.get('message', '')}" return {"error": f"Gate期货 API 数据格式错误或未找到合约 {contract_symbol}. Msg: {error_msg}"} except requests.exceptions.RequestException as e: return {"error": f"Gate期货请求失败: {e}"} except Exception as e: return {"error": f"Gate期货意外错误: {e}"} app = Flask(__name__) MAX_HISTORY_POINTS = 86400 latest_data = { "oo_price": None, "gate_spot_price": None, "gate_futures_price": None, "diff_oo_vs_spot_percentage": None, "diff_oo_vs_futures_percentage": None, "diff_spot_vs_futures_percentage": None, # 基差 "oo_error": None, "gate_spot_error": None, "gate_futures_error": None, "last_updated": None, "gate_spot_pair_name": GATEIO_SPOT_PAIR, "gate_futures_contract_name": GATEIO_FUTURES_CONTRACT } # 历史价格: oo, spot, futures historical_prices = deque(maxlen=MAX_HISTORY_POINTS) # 历史价差: oo_vs_spot, oo_vs_futures, spot_vs_futures historical_diffs = deque(maxlen=MAX_HISTORY_POINTS) data_lock = threading.Lock() def calculate_percentage_diff(price_a, price_b): """(price_a - price_b) / price_b * 100, B为基准""" if price_a is not None and price_b is not None and price_b > 0: diff = price_a - price_b return (diff / price_b) * 100 return None def update_prices_periodically(): global latest_data, historical_prices, historical_diffs while True: fetch_time = time.strftime("%H:%M:%S") oo_data = get_openocean_price_bsc(IN_TOKEN_ADDRESS_BSC, OUT_TOKEN_ADDRESS_BSC, AMOUNT_TO_QUERY_HUMAN) spot_data = get_gateio_spot_price(GATEIO_SPOT_PAIR) futures_data = get_gateio_futures_price(GATEIO_FUTURES_CONTRACT) oo_price = oo_data.get("price_usdt_per_out_token") spot_price = spot_data.get("price_base_in_quote") futures_price = futures_data.get("price_settle_per_base_asset") oo_err = oo_data.get("error") spot_err = spot_data.get("error") futures_err = futures_data.get("error") # 价差计算 diff_oo_spot_pct = calculate_percentage_diff(oo_price, spot_price) diff_oo_futures_pct = calculate_percentage_diff(oo_price, futures_price) diff_spot_futures_pct = calculate_percentage_diff(spot_price, futures_price) # 存储历史价格 historical_prices.append({ "timestamp": fetch_time, "oo": float(oo_price) if oo_price else None, "spot": float(spot_price) if spot_price else None, "futures": float(futures_price) if futures_price else None, }) # 存储历史价差 historical_diffs.append({ "timestamp": fetch_time, "oo_vs_spot": float(diff_oo_spot_pct) if diff_oo_spot_pct is not None else None, "oo_vs_futures": float(diff_oo_futures_pct) if diff_oo_futures_pct is not None else None, "spot_vs_futures": float(diff_spot_futures_pct) if diff_spot_futures_pct is not None else None, }) with data_lock: latest_data["oo_price"] = str(oo_price) if oo_price else None latest_data["gate_spot_price"] = str(spot_price) if spot_price else None latest_data["gate_futures_price"] = str(futures_price) if futures_price else None latest_data[ "diff_oo_vs_spot_percentage"] = f"{diff_oo_spot_pct:+.4f}%" if diff_oo_spot_pct is not None else "N/A" latest_data[ "diff_oo_vs_futures_percentage"] = f"{diff_oo_futures_pct:+.4f}%" if diff_oo_futures_pct is not None else "N/A" latest_data[ "diff_spot_vs_futures_percentage"] = f"{diff_spot_futures_pct:+.4f}%" if diff_spot_futures_pct is not None else "N/A" latest_data["oo_error"] = oo_err latest_data["gate_spot_error"] = spot_err latest_data["gate_futures_error"] = futures_err latest_data["last_updated"] = time.strftime("%Y-%m-%d %H:%M:%S") # gate_spot_pair_name & gate_futures_contract_name are set at init print(f"{fetch_time} | OO: {latest_data['oo_price']} (Err:{oo_err}) | " f"Spot({GATEIO_SPOT_PAIR}): {latest_data['gate_spot_price']} (Err:{spot_err}) | " f"Futures({GATEIO_FUTURES_CONTRACT}): {latest_data['gate_futures_price']} (Err:{futures_err}) | " f"D(OoS):{latest_data['diff_oo_vs_spot_percentage']} D(OoF):{latest_data['diff_oo_vs_futures_percentage']} D(SpF):{latest_data['diff_spot_vs_futures_percentage']}") time.sleep(1) @app.route('/') def index(): app_config = { "GATEIO_SPOT_PAIR": GATEIO_SPOT_PAIR, "GATEIO_FUTURES_CONTRACT": GATEIO_FUTURES_CONTRACT, # 也可以把 OUT_TOKEN_ADDRESS_BSC 对应的代币符号传过去,如果能确定的话 "TARGET_ASSET_SYMBOL": GATEIO_SPOT_PAIR.split('_')[0] # 假设命名规则为 ASSET_USDT } return render_template('index.html', config=app_config) @app.route('/data') def get_data(): with data_lock: response_data = { "current": {**latest_data}, # Create a copy "history": { "prices": { "labels": [item['timestamp'] for item in historical_prices], "oo": [item['oo'] for item in historical_prices], "spot": [item['spot'] for item in historical_prices], "futures": [item['futures'] for item in historical_prices], }, "diffs": { # Changed from "difference" to "diffs" to hold multiple series "labels": [item['timestamp'] for item in historical_diffs], # Should be same as price labels "oo_vs_spot": [item['oo_vs_spot'] for item in historical_diffs], "oo_vs_futures": [item['oo_vs_futures'] for item in historical_diffs], "spot_vs_futures": [item['spot_vs_futures'] for item in historical_diffs], } } } # Add config here too, so JS can access it if not from initial render_template response_data["current"]["config_gate_spot_pair"] = GATEIO_SPOT_PAIR response_data["current"]["config_gate_futures_contract"] = GATEIO_FUTURES_CONTRACT response_data["current"]["config_target_asset_symbol"] = GATEIO_SPOT_PAIR.split('_')[0] return jsonify(response_data) if __name__ == "__main__": werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.ERROR) print(f"监控 OpenOcean ({OUT_TOKEN_ADDRESS_BSC.split('0x')[-1][:6]}...) vs USDT") print(f"Gate.io 现货: {GATEIO_SPOT_PAIR}") print(f"Gate.io 期货: {GATEIO_FUTURES_CONTRACT}") # Basic check for CAT example, adjust if your token is different asset_symbol = GATEIO_SPOT_PAIR.split('_')[0].upper() if asset_symbol == "CAT" and "0x6894CDe390a3f51155ea41Ed24a33A4827d3063D" not in OUT_TOKEN_ADDRESS_BSC: print(f"[警告] 配置的资产符号 'CAT' 可能与 OpenOcean 输出代币地址不完全对应。") elif asset_symbol not in GATEIO_FUTURES_CONTRACT.upper(): print(f"[警告] 现货交易对 '{GATEIO_SPOT_PAIR}' 的基础资产与期货合约 '{GATEIO_FUTURES_CONTRACT}' 可能不匹配。") price_updater_thread = threading.Thread(target=update_prices_periodically, daemon=True) price_updater_thread.start() app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)