| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import requests
- import decimal
- import time
- import threading
- import json
- from flask import Flask, render_template, jsonify
- from collections import deque # 用于高效地存储固定数量的历史数据
- import logging # Import the logging module
- # --- 配置部分 (与之前相同) ---
- GATEIO_SPOT_PAIR = 'CAT_USDT'
- IN_TOKEN_ADDRESS_BSC = '0x55d398326f99059ff775485246999027b3197955'
- OUT_TOKEN_ADDRESS_BSC = '0x6894CDe390a3f51155ea41Ed24a33A4827d3063D'
- AMOUNT_TO_QUERY_HUMAN = decimal.Decimal('1000')
- PROXY_HOST = '127.0.0.1'
- PROXY_PORT = '7890'
- proxies = {
- 'http': f'http://{PROXY_HOST}:{PROXY_PORT}',
- 'https': f'http://{PROXY_HOST}:{PROXY_PORT}',
- }
- # proxies = None # 如果你不需要代理
- decimal.getcontext().prec = 36
- # --- 价格获取函数 (与之前相同, 省略以保持简洁) ---
- def get_openocean_price_bsc(in_token_addr, out_token_addr, human_amount_in_decimal_for_request, gas_price=3):
- # ... (代码与之前相同)
- chain = 'bsc'
- url = f'https://open-api.openocean.finance/v4/{chain}/quote'
- params = {
- 'inTokenAddress': in_token_addr,
- 'outTokenAddress': out_token_addr,
- 'amount': str(human_amount_in_decimal_for_request),
- 'gasPrice': gas_price,
- }
- try:
- response = requests.get(url, params=params, proxies=proxies, timeout=10)
- response.raise_for_status()
- data = response.json()
- if data.get('code') == 200 and data.get('data'):
- api_data = data['data']
- if not (api_data.get('inToken') and api_data['inToken'].get('decimals') is not None and
- api_data.get('outToken') and api_data['outToken'].get('decimals') is not None and
- api_data.get('inAmount') is not None and api_data.get('outAmount') is not None):
- return {"error": "OO API响应中缺少必要的数据"}
- in_token_decimals = int(api_data['inToken']['decimals'])
- out_token_decimals = int(api_data['outToken']['decimals'])
- atomic_in_amount = decimal.Decimal(api_data['inAmount'])
- atomic_out_amount = decimal.Decimal(api_data['outAmount'])
- derived_human_in_amount = atomic_in_amount / (decimal.Decimal('10') ** in_token_decimals)
- derived_human_out_amount = atomic_out_amount / (decimal.Decimal('10') ** out_token_decimals)
- if derived_human_out_amount == 0:
- return {"error": "OO 计算的输出数量为零"}
- calculated_price = derived_human_in_amount / derived_human_out_amount
- return {"price_in_per_out": calculated_price}
- else:
- error_message = data.get('message', 'N/A') if data else '无响应数据'
- error_code = data.get('code', 'N/A') if data else 'N/A'
- return {"error": f"OO API 错误 - Code: {error_code}, Msg: {error_message}"}
- except requests.exceptions.RequestException as e:
- return {"error": f"OO请求失败: {e}"}
- except Exception as e:
- return {"error": f"OO意外错误: {e}"}
- def get_gateio_spot_price(pair_symbol):
- # ... (代码与之前相同)
- url = f'https://api.gateio.ws/api/v4/spot/tickers'
- params = {'currency_pair': pair_symbol}
- try:
- response = requests.get(url, params=params, proxies=proxies, timeout=10)
- response.raise_for_status()
- data = response.json()
- if isinstance(data, list) and len(data) > 0:
- ticker_data = data[0]
- if ticker_data.get('currency_pair') == pair_symbol:
- last_price_str = ticker_data.get('last')
- if last_price_str:
- return {"price_base_in_quote": decimal.Decimal(last_price_str)}
- else:
- return {"error": "Gate未找到last price"}
- else:
- return {"error": f"Gate交易对不匹配"}
- else:
- return {"error": "Gate API数据格式错误"}
- except requests.exceptions.RequestException as e:
- return {"error": f"Gate请求失败: {e}"}
- except Exception as e:
- return {"error": f"Gate意外错误: {e}"}
- app = Flask(__name__)
- # --- 全局数据和历史记录 ---
- MAX_HISTORY_POINTS = 60 # 图表上显示的最大数据点数量 (例如,过去5分钟,每5秒一个点)
- latest_data = {
- "oo_price": None,
- "gate_price": None,
- "difference_percentage": None,
- "oo_error": None,
- "gate_error": None,
- "last_updated": None
- }
- # 使用 deque 来存储历史数据,它在添加和删除元素时具有 O(1) 的复杂度
- historical_prices = deque(maxlen=MAX_HISTORY_POINTS) # 存储 {'timestamp': ts, 'oo': price, 'gate': price}
- historical_diff = deque(maxlen=MAX_HISTORY_POINTS) # 存储 {'timestamp': ts, 'diff': percentage}
- data_lock = threading.Lock()
- def update_prices_periodically():
- global latest_data, historical_prices, historical_diff
- while True:
- fetch_timestamp_str = time.strftime("%H:%M:%S") # 用于图表标签的简单时间戳
- oo_price_data = get_openocean_price_bsc(
- IN_TOKEN_ADDRESS_BSC,
- OUT_TOKEN_ADDRESS_BSC,
- AMOUNT_TO_QUERY_HUMAN
- )
- gate_price_data = get_gateio_spot_price(GATEIO_SPOT_PAIR)
- current_oo_price_decimal = None
- current_gate_price_decimal = None
- current_oo_error = None
- current_gate_error = None
- diff_percentage_val = None
- diff_percentage_str = "N/A"
- if "error" not in oo_price_data:
- current_oo_price_decimal = oo_price_data['price_in_per_out']
- else:
- current_oo_error = oo_price_data['error']
- if "error" not in gate_price_data:
- current_gate_price_decimal = gate_price_data['price_base_in_quote']
- else:
- current_gate_error = gate_price_data['error']
- # 只有当两个价格都有效时,才计算价差并记录历史
- if current_oo_price_decimal is not None and current_gate_price_decimal is not None:
- # 将Decimal转换为浮点数以便存储和图表绘制 (Chart.js 通常使用JS number)
- oo_price_float = float(current_oo_price_decimal)
- gate_price_float = float(current_gate_price_decimal)
- historical_prices.append({
- "timestamp": fetch_timestamp_str,
- "oo": oo_price_float,
- "gate": gate_price_float
- })
- if current_gate_price_decimal > 0:
- difference = current_oo_price_decimal - current_gate_price_decimal
- percentage_diff_decimal = (difference / current_gate_price_decimal) * 100
- diff_percentage_val = float(percentage_diff_decimal) # 用于图表
- diff_percentage_str = f"{percentage_diff_decimal:+.4f}%" # 用于显示
- historical_diff.append({
- "timestamp": fetch_timestamp_str,
- "diff": diff_percentage_val
- })
- else:
- diff_percentage_str = "Gate价格为0"
- # 对于价差图表,如果gate价格为0,可以记录一个特殊值或跳过
- historical_diff.append({
- "timestamp": fetch_timestamp_str,
- "diff": None # 或者一个非常大/小的值来表示无效
- })
- else:
- # 如果其中一个价格无效,我们仍然可以尝试记录有效的那个,或者都标记为None
- # 这里我们选择如果任一价格无效,价格历史点也记录None,价差历史点也记录None
- historical_prices.append({
- "timestamp": fetch_timestamp_str,
- "oo": float(current_oo_price_decimal) if current_oo_price_decimal else None,
- "gate": float(current_gate_price_decimal) if current_gate_price_decimal else None
- })
- historical_diff.append({
- "timestamp": fetch_timestamp_str,
- "diff": None
- })
- with data_lock:
- latest_data["oo_price"] = str(current_oo_price_decimal) if current_oo_price_decimal is not None else None
- latest_data["gate_price"] = str(
- current_gate_price_decimal) if current_gate_price_decimal is not None else None
- latest_data["difference_percentage"] = diff_percentage_str
- latest_data["oo_error"] = current_oo_error
- latest_data["gate_error"] = current_gate_error
- latest_data["last_updated"] = time.strftime("%Y-%m-%d %H:%M:%S")
- log_oo = f"{current_oo_price_decimal:.6f}" if current_oo_price_decimal else "N/A"
- log_gate = f"{current_gate_price_decimal:.6f}" if current_gate_price_decimal else "N/A"
- print(
- f"Data updated at {fetch_timestamp_str}: OO: {log_oo}, Gate: {log_gate}, Diff: {diff_percentage_str}, Errors: OO: {current_oo_error}, Gate: {current_gate_error}")
- time.sleep(5) # 更新频率
- @app.route('/')
- def index():
- return render_template('index.html')
- @app.route('/data')
- def get_data():
- with data_lock:
- # 准备图表需要的数据
- # 对于价格图表
- price_chart_labels = [item['timestamp'] for item in historical_prices]
- oo_price_values = [item['oo'] for item in historical_prices]
- gate_price_values = [item['gate'] for item in historical_prices]
- # 对于价差图表
- diff_chart_labels = [item['timestamp'] for item in historical_diff]
- diff_values = [item['diff'] for item in historical_diff]
- # 将最新的数据点加入 (如果历史记录已满,deque会自动移除旧的)
- # 这里的数据是 latest_data 和 historical_data 的组合
- # Chart.js需要完整的历史数据来初始化,然后可以增量更新
- # 或者,前端每次都获取完整的历史窗口
- # 为了简单起见,前端将定期获取完整的 /data,其中包含历史和当前
- response_data = {
- "current": latest_data,
- "history": {
- "prices": {
- "labels": list(price_chart_labels), # deque 不是直接 JSON 可序列化的
- "oo": list(oo_price_values),
- "gate": list(gate_price_values)
- },
- "difference": {
- "labels": list(diff_chart_labels),
- "values": list(diff_values)
- }
- }
- }
- return jsonify(response_data)
- if __name__ == "__main__":
- # Disable Werkzeug access logs
- werkzeug_logger = logging.getLogger('werkzeug')
- werkzeug_logger.setLevel(logging.ERROR) # or logging.CRITICAL
- price_updater_thread = threading.Thread(target=update_prices_periodically, daemon=True)
- price_updater_thread.start()
- app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)
|