import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import glob
import json
import re
from datetime import datetime
# 配置matplotlib中文字体显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
def extract_timestamp(timestamp_str):
"""从时间戳字符串中提取日期时间"""
if isinstance(timestamp_str, tuple):
timestamp_str = timestamp_str[0]
# 使用正则表达式匹配日期时间格式
match = re.search(r'(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})', timestamp_str)
if match:
return match.group(1)
return timestamp_str
def parse_state_flow(state_flow_str):
"""解析状态流字符串为Python对象"""
if isinstance(state_flow_str, str):
try:
# 替换单引号为双引号以便JSON解析
state_flow_str = state_flow_str.replace("'", '"')
return json.loads(state_flow_str)
except json.JSONDecodeError:
# 如果无法解析为JSON,尝试使用eval(注意:在生产环境中应避免使用eval)
try:
return eval(state_flow_str)
except:
return state_flow_str
return state_flow_str
def read_xlsx_files(directory='xlsx'):
"""读取目录中的所有xlsx文件并合并为一个DataFrame"""
# 获取目录中所有xlsx文件的路径
xlsx_files = glob.glob(os.path.join(directory, '*.xlsx'))
# 排除临时文件
xlsx_files = [f for f in xlsx_files if not os.path.basename(f).startswith('~$')]
if not xlsx_files:
print(f"在 {directory} 目录中未找到xlsx文件")
return None
# 读取并合并所有文件
dfs = []
for file in xlsx_files:
try:
print(f"正在读取文件: {file}")
df = pd.read_excel(file)
dfs.append(df)
except Exception as e:
print(f"读取文件 {file} 时出错: {e}")
if not dfs:
print("没有成功读取任何文件")
return None
# 合并所有DataFrame
combined_df = pd.concat(dfs, ignore_index=True)
# 设置列名
column_names = [
'pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
'symbol', 'exchangeOutAmount', 'strategy', 'queryPriceUrl',
'id', 'profit', 'creationTime', 'stateFlow', 'currentState'
]
# 如果列数不匹配,进行调整
if len(combined_df.columns) == len(column_names):
combined_df.columns = column_names
else:
print(f"警告: 列数不匹配。文件列数: {len(combined_df.columns)}, 预期列数: {len(column_names)}")
# 尝试使用前N列
combined_df.columns = column_names[:len(combined_df.columns)]
return combined_df
def read_xlsx_file(file_path):
"""读取指定的xlsx文件并返回一个DataFrame"""
if not os.path.exists(file_path):
print(f"文件不存在: {file_path}")
return None
try:
print(f"正在读取文件: {file_path}")
df = pd.read_excel(file_path)
except Exception as e:
print(f"读取文件 {file_path} 时出错: {e}")
return None
# 设置列名
column_names = [
'pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
'symbol', 'exchangeOutAmount', 'strategy', 'queryPriceUrl',
'id', 'profit', 'creationTime', 'stateFlow', 'currentState'
]
# 如果列数不匹配,进行调整
if len(df.columns) == len(column_names):
df.columns = column_names
else:
print(f"警告: 列数不匹配。文件列数: {len(df.columns)}, 预期列数: {len(column_names)}")
# 尝试使用前N列
df.columns = column_names[:len(df.columns)]
return df
def preprocess_data(df):
"""数据预处理"""
if df is None or df.empty:
print("没有数据可供处理")
return None
# 创建副本避免警告
processed_df = df.copy()
# 转换数值列
numeric_columns = ['pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
'exchangeOutAmount', 'profit']
for col in numeric_columns:
if col in processed_df.columns:
processed_df[col] = pd.to_numeric(processed_df[col], errors='coerce')
# 处理时间戳
if 'creationTime' in processed_df.columns:
processed_df['creationTime'] = processed_df['creationTime'].apply(extract_timestamp)
processed_df['creationTime'] = pd.to_datetime(processed_df['creationTime'], errors='coerce')
# 计算价差百分比
if all(col in processed_df.columns for col in ['cexPrice', 'dexPrice']):
processed_df['price_diff_pct'] = (processed_df['cexPrice'] - processed_df['dexPrice']) / processed_df['dexPrice'] * 100
# 计算交易金额
if all(col in processed_df.columns for col in ['exchangeOutAmount', 'cexPrice']):
processed_df['trade_value'] = processed_df['exchangeOutAmount'] * processed_df['cexPrice']
return processed_df
def analyze_data(df):
"""基础数据分析"""
if df is None or df.empty:
print("没有数据可供分析")
return None
results = {}
# 数值列定义(用于其他分析)
numeric_columns = ['pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
'exchangeOutAmount', 'profit', 'price_diff_pct', 'trade_value']
# 按交易对分组统计
if 'symbol' in df.columns:
symbol_stats = df.groupby('symbol')['profit'].agg(['count', 'sum', 'mean', 'std']).reset_index()
symbol_stats = symbol_stats.sort_values('sum', ascending=False)
results['symbol_stats'] = symbol_stats
# 按策略分组统计
if 'strategy' in df.columns:
strategy_stats = df.groupby('strategy')['profit'].agg(['count', 'sum', 'mean', 'std']).reset_index()
strategy_stats = strategy_stats.sort_values('sum', ascending=False)
results['strategy_stats'] = strategy_stats
# 按状态分组统计
if 'currentState' in df.columns:
state_stats = df.groupby('currentState')['profit'].agg(['count', 'sum', 'mean']).reset_index()
results['state_stats'] = state_stats
# 时间序列分析
if 'creationTime' in df.columns and pd.api.types.is_datetime64_any_dtype(df['creationTime']):
# 按日期分组
df['date'] = df['creationTime'].dt.date
daily_stats = df.groupby('date')['profit'].agg(['count', 'sum', 'mean']).reset_index()
results['daily_stats'] = daily_stats
# 按小时分组
df['hour'] = df['creationTime'].dt.hour
hourly_stats = df.groupby('hour')['profit'].agg(['count', 'sum', 'mean']).reset_index()
results['hourly_stats'] = hourly_stats
return results
def visualize_data(df, results, output_dir='analysis_results'):
"""数据可视化"""
if df is None or df.empty:
print("没有数据可供可视化")
return
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 1. 利润分布直方图
if 'profit' in df.columns:
plt.figure(figsize=(10, 6))
plt.hist(df['profit'].dropna(), bins=30, alpha=0.7)
plt.title('利润分布')
plt.xlabel('利润')
plt.ylabel('频率')
plt.grid(True, alpha=0.3)
plt.savefig(os.path.join(output_dir, 'profit_distribution.png'))
plt.close()
# 2. 价差百分比与利润的散点图
if all(col in df.columns for col in ['price_diff_pct', 'profit']):
plt.figure(figsize=(10, 6))
plt.scatter(df['price_diff_pct'], df['profit'], alpha=0.5)
plt.title('价差百分比与利润的关系')
plt.xlabel('价差百分比 (%)')
plt.ylabel('利润')
plt.grid(True, alpha=0.3)
plt.savefig(os.path.join(output_dir, 'price_diff_vs_profit.png'))
plt.close()
# 3. 每日利润趋势
if 'daily_stats' in results:
daily_stats = results['daily_stats']
plt.figure(figsize=(12, 6))
plt.plot(daily_stats['date'], daily_stats['sum'], marker='o', linestyle='-')
plt.title('每日总利润趋势')
plt.xlabel('日期')
plt.ylabel('总利润')
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(output_dir, 'daily_profit_trend.png'))
plt.close()
# 4. 按交易对的利润总和条形图
if 'symbol_stats' in results:
symbol_stats = results['symbol_stats']
plt.figure(figsize=(12, 8))
plt.barh(symbol_stats['symbol'], symbol_stats['sum'])
plt.title('各交易对总利润')
plt.xlabel('总利润')
plt.ylabel('交易对')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(output_dir, 'symbol_profit.png'))
plt.close()
# 6. 按小时的平均利润
if 'hourly_stats' in results:
hourly_stats = results['hourly_stats']
plt.figure(figsize=(12, 6))
plt.bar(hourly_stats['hour'], hourly_stats['mean'])
plt.title('各小时平均利润')
plt.xlabel('小时')
plt.ylabel('平均利润')
plt.grid(True, alpha=0.3)
plt.xticks(range(24))
plt.tight_layout()
plt.savefig(os.path.join(output_dir, 'hourly_avg_profit.png'))
plt.close()
def generate_report(results, output_dir='analysis_results'):
"""生成分析报告"""
if not results:
print("没有结果可供生成报告")
return
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 生成HTML报告
html_content = """
交易数据分析报告
交易数据分析报告
生成时间: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """
"""
# 添加交易对统计
if 'symbol_stats' in results:
html_content += """
交易对统计
| 交易对 |
交易次数 |
总利润 |
平均利润 |
标准差 |
"""
for _, row in results['symbol_stats'].iterrows():
html_content += f"""
| {row['symbol']} |
{row['count']} |
{row['sum']:.4f} |
{row['mean']:.4f} |
{row['std']:.4f} |
"""
html_content += "
"
# 添加策略统计
if 'strategy_stats' in results:
html_content += """
策略统计
| 策略 |
交易次数 |
总利润 |
平均利润 |
标准差 |
"""
for _, row in results['strategy_stats'].iterrows():
html_content += f"""
| {row['strategy']} |
{row['count']} |
{row['sum']:.4f} |
{row['mean']:.4f} |
{row['std']:.4f} |
"""
html_content += "
"
# 添加状态统计
if 'state_stats' in results:
html_content += """
状态统计
| 状态 |
交易次数 |
总利润 |
平均利润 |
"""
for _, row in results['state_stats'].iterrows():
html_content += f"""
| {row['currentState']} |
{row['count']} |
{row['sum']:.4f} |
{row['mean']:.4f} |
"""
html_content += "
"
# 添加每日统计
if 'daily_stats' in results:
html_content += """
每日统计
| 日期 |
交易次数 |
总利润 |
平均利润 |
"""
for _, row in results['daily_stats'].iterrows():
html_content += f"""
| {row['date']} |
{row['count']} |
{row['sum']:.4f} |
{row['mean']:.4f} |
"""
html_content += "
"
# 添加图表
html_content += """
数据可视化
利润分布
价差百分比与利润的关系
每日总利润趋势
各交易对总利润
各小时平均利润
"""
# 结束HTML
html_content += """
"""
# 写入HTML文件
with open(os.path.join(output_dir, 'analysis_report.html'), 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"分析报告已生成: {os.path.join(output_dir, 'analysis_report.html')}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='交易数据分析脚本')
parser.add_argument('file_path', type=str, help='要分析的XLSX文件的路径')
args = parser.parse_args()
print(f"开始读取XLSX文件: {args.file_path}")
df = read_xlsx_file(args.file_path)
if df is not None and not df.empty:
print(f"成功读取数据,共 {len(df)} 行")
print("正在预处理数据...")
processed_df = preprocess_data(df)
print("正在分析数据...")
results = analyze_data(processed_df)
print("正在生成可视化图表...")
visualize_data(processed_df, results)
print("正在生成分析报告...")
generate_report(results)
print("分析完成!")
else:
print("没有数据可供分析")
if __name__ == "__main__":
main()