import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import glob import json import re from datetime import datetime def extract_timestamp(timestamp_str): """从时间戳字符串中提取日期时间""" if isinstance(timestamp_str, tuple): timestamp_str = timestamp_str[0] # 使用正则表达式匹配日期时间格式 match = re.search(r'(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})', timestamp_str) if match: return match.group(1) return timestamp_str def parse_state_flow(state_flow_str): """解析状态流字符串为Python对象""" if isinstance(state_flow_str, str): try: # 替换单引号为双引号以便JSON解析 state_flow_str = state_flow_str.replace("'", '"') return json.loads(state_flow_str) except json.JSONDecodeError: # 如果无法解析为JSON,尝试使用eval(注意:在生产环境中应避免使用eval) try: return eval(state_flow_str) except: return state_flow_str return state_flow_str def read_xlsx_files(directory='xlsx'): """读取目录中的所有xlsx文件并合并为一个DataFrame""" # 获取目录中所有xlsx文件的路径 xlsx_files = glob.glob(os.path.join(directory, '*.xlsx')) # 排除临时文件 xlsx_files = [f for f in xlsx_files if not os.path.basename(f).startswith('~$')] if not xlsx_files: print(f"在 {directory} 目录中未找到xlsx文件") return None # 读取并合并所有文件 dfs = [] for file in xlsx_files: try: print(f"正在读取文件: {file}") df = pd.read_excel(file) dfs.append(df) except Exception as e: print(f"读取文件 {file} 时出错: {e}") if not dfs: print("没有成功读取任何文件") return None # 合并所有DataFrame combined_df = pd.concat(dfs, ignore_index=True) # 设置列名 column_names = [ 'pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice', 'symbol', 'exchangeOutAmount', 'strategy', 'queryPriceUrl', 'id', 'profit', 'creationTime', 'stateFlow', 'currentState' ] # 如果列数不匹配,进行调整 if len(combined_df.columns) == len(column_names): combined_df.columns = column_names else: print(f"警告: 列数不匹配。文件列数: {len(combined_df.columns)}, 预期列数: {len(column_names)}") # 尝试使用前N列 combined_df.columns = column_names[:len(combined_df.columns)] return combined_df def read_xlsx_file(file_path): """读取指定的xlsx文件并返回一个DataFrame""" if not os.path.exists(file_path): print(f"文件不存在: {file_path}") return None try: print(f"正在读取文件: {file_path}") df = pd.read_excel(file_path) except Exception as e: print(f"读取文件 {file_path} 时出错: {e}") return None # 设置列名 column_names = [ 'pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice', 'symbol', 'exchangeOutAmount', 'strategy', 'queryPriceUrl', 'id', 'profit', 'creationTime', 'stateFlow', 'currentState' ] # 如果列数不匹配,进行调整 if len(df.columns) == len(column_names): df.columns = column_names else: print(f"警告: 列数不匹配。文件列数: {len(df.columns)}, 预期列数: {len(column_names)}") # 尝试使用前N列 df.columns = column_names[:len(df.columns)] return df def preprocess_data(df): """数据预处理""" if df is None or df.empty: print("没有数据可供处理") return None # 创建副本避免警告 processed_df = df.copy() # 转换数值列 numeric_columns = ['pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice', 'exchangeOutAmount', 'profit'] for col in numeric_columns: if col in processed_df.columns: processed_df[col] = pd.to_numeric(processed_df[col], errors='coerce') # 处理时间戳 if 'creationTime' in processed_df.columns: processed_df['creationTime'] = processed_df['creationTime'].apply(extract_timestamp) processed_df['creationTime'] = pd.to_datetime(processed_df['creationTime'], errors='coerce') # 计算价差百分比 if all(col in processed_df.columns for col in ['cexPrice', 'dexPrice']): processed_df['price_diff_pct'] = (processed_df['cexPrice'] - processed_df['dexPrice']) / processed_df['dexPrice'] * 100 # 计算交易金额 if all(col in processed_df.columns for col in ['exchangeOutAmount', 'cexPrice']): processed_df['trade_value'] = processed_df['exchangeOutAmount'] * processed_df['cexPrice'] return processed_df def analyze_data(df): """基础数据分析""" if df is None or df.empty: print("没有数据可供分析") return None results = {} # 基础统计分析 numeric_columns = ['pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice', 'exchangeOutAmount', 'profit', 'price_diff_pct', 'trade_value'] # 计算基础统计量 stats = df[numeric_columns].describe() results['basic_stats'] = stats # 按交易对分组统计 if 'symbol' in df.columns: symbol_stats = df.groupby('symbol')['profit'].agg(['count', 'sum', 'mean', 'std']).reset_index() symbol_stats = symbol_stats.sort_values('sum', ascending=False) results['symbol_stats'] = symbol_stats # 按策略分组统计 if 'strategy' in df.columns: strategy_stats = df.groupby('strategy')['profit'].agg(['count', 'sum', 'mean', 'std']).reset_index() strategy_stats = strategy_stats.sort_values('sum', ascending=False) results['strategy_stats'] = strategy_stats # 按状态分组统计 if 'currentState' in df.columns: state_stats = df.groupby('currentState')['profit'].agg(['count', 'sum', 'mean']).reset_index() results['state_stats'] = state_stats # 时间序列分析 if 'creationTime' in df.columns and pd.api.types.is_datetime64_any_dtype(df['creationTime']): # 按日期分组 df['date'] = df['creationTime'].dt.date daily_stats = df.groupby('date')['profit'].agg(['count', 'sum', 'mean']).reset_index() results['daily_stats'] = daily_stats # 按小时分组 df['hour'] = df['creationTime'].dt.hour hourly_stats = df.groupby('hour')['profit'].agg(['count', 'sum', 'mean']).reset_index() results['hourly_stats'] = hourly_stats # 相关性分析 if len(numeric_columns) > 1: correlation = df[numeric_columns].corr() results['correlation'] = correlation return results def visualize_data(df, results, output_dir='analysis_results'): """数据可视化""" if df is None or df.empty: print("没有数据可供可视化") return # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 1. 利润分布直方图 if 'profit' in df.columns: plt.figure(figsize=(10, 6)) plt.hist(df['profit'].dropna(), bins=30, alpha=0.7) plt.title('利润分布') plt.xlabel('利润') plt.ylabel('频率') plt.grid(True, alpha=0.3) plt.savefig(os.path.join(output_dir, 'profit_distribution.png')) plt.close() # 2. 价差百分比与利润的散点图 if all(col in df.columns for col in ['price_diff_pct', 'profit']): plt.figure(figsize=(10, 6)) plt.scatter(df['price_diff_pct'], df['profit'], alpha=0.5) plt.title('价差百分比与利润的关系') plt.xlabel('价差百分比 (%)') plt.ylabel('利润') plt.grid(True, alpha=0.3) plt.savefig(os.path.join(output_dir, 'price_diff_vs_profit.png')) plt.close() # 3. 每日利润趋势 if 'daily_stats' in results: daily_stats = results['daily_stats'] plt.figure(figsize=(12, 6)) plt.plot(daily_stats['date'], daily_stats['sum'], marker='o', linestyle='-') plt.title('每日总利润趋势') plt.xlabel('日期') plt.ylabel('总利润') plt.grid(True, alpha=0.3) plt.xticks(rotation=45) plt.tight_layout() plt.savefig(os.path.join(output_dir, 'daily_profit_trend.png')) plt.close() # 4. 按交易对的利润总和条形图 if 'symbol_stats' in results: symbol_stats = results['symbol_stats'] plt.figure(figsize=(12, 8)) plt.barh(symbol_stats['symbol'], symbol_stats['sum']) plt.title('各交易对总利润') plt.xlabel('总利润') plt.ylabel('交易对') plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(output_dir, 'symbol_profit.png')) plt.close() # 5. 相关性热图 if 'correlation' in results: correlation = results['correlation'] plt.figure(figsize=(12, 10)) plt.imshow(correlation, cmap='coolwarm', vmin=-1, vmax=1) plt.colorbar() plt.title('特征相关性热图') plt.xticks(range(len(correlation.columns)), correlation.columns, rotation=90) plt.yticks(range(len(correlation.columns)), correlation.columns) # 在热图上添加相关系数值 for i in range(len(correlation.columns)): for j in range(len(correlation.columns)): plt.text(j, i, f'{correlation.iloc[i, j]:.2f}', ha='center', va='center', color='white' if abs(correlation.iloc[i, j]) > 0.5 else 'black') plt.tight_layout() plt.savefig(os.path.join(output_dir, 'correlation_heatmap.png')) plt.close() # 6. 按小时的平均利润 if 'hourly_stats' in results: hourly_stats = results['hourly_stats'] plt.figure(figsize=(12, 6)) plt.bar(hourly_stats['hour'], hourly_stats['mean']) plt.title('各小时平均利润') plt.xlabel('小时') plt.ylabel('平均利润') plt.grid(True, alpha=0.3) plt.xticks(range(24)) plt.tight_layout() plt.savefig(os.path.join(output_dir, 'hourly_avg_profit.png')) plt.close() def generate_report(results, output_dir='analysis_results'): """生成分析报告""" if not results: print("没有结果可供生成报告") return # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 生成HTML报告 html_content = """ 交易数据分析报告

交易数据分析报告

生成时间: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """

""" # 添加基础统计信息 if 'basic_stats' in results: html_content += """

基础统计信息

""" for col in results['basic_stats'].columns: html_content += f"" html_content += "" for idx, row in results['basic_stats'].iterrows(): html_content += f"" for col in results['basic_stats'].columns: html_content += f"" html_content += "" html_content += "
指标{col}
{idx}{row[col]:.4f}
" # 添加交易对统计 if 'symbol_stats' in results: html_content += """

交易对统计

""" for _, row in results['symbol_stats'].iterrows(): html_content += f""" """ html_content += "
交易对 交易次数 总利润 平均利润 标准差
{row['symbol']} {row['count']} {row['sum']:.4f} {row['mean']:.4f} {row['std']:.4f}
" # 添加策略统计 if 'strategy_stats' in results: html_content += """

策略统计

""" for _, row in results['strategy_stats'].iterrows(): html_content += f""" """ html_content += "
策略 交易次数 总利润 平均利润 标准差
{row['strategy']} {row['count']} {row['sum']:.4f} {row['mean']:.4f} {row['std']:.4f}
" # 添加状态统计 if 'state_stats' in results: html_content += """

状态统计

""" for _, row in results['state_stats'].iterrows(): html_content += f""" """ html_content += "
状态 交易次数 总利润 平均利润
{row['currentState']} {row['count']} {row['sum']:.4f} {row['mean']:.4f}
" # 添加每日统计 if 'daily_stats' in results: html_content += """

每日统计

""" for _, row in results['daily_stats'].iterrows(): html_content += f""" """ html_content += "
日期 交易次数 总利润 平均利润
{row['date']} {row['count']} {row['sum']:.4f} {row['mean']:.4f}
" # 添加图表 html_content += """

数据可视化

利润分布

利润分布

价差百分比与利润的关系

价差百分比与利润的关系

每日总利润趋势

每日总利润趋势

各交易对总利润

各交易对总利润

特征相关性热图

特征相关性热图

各小时平均利润

各小时平均利润
""" # 结束HTML html_content += """ """ # 写入HTML文件 with open(os.path.join(output_dir, 'analysis_report.html'), 'w', encoding='utf-8') as f: f.write(html_content) print(f"分析报告已生成: {os.path.join(output_dir, 'analysis_report.html')}") def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='交易数据分析脚本') parser.add_argument('file_path', type=str, help='要分析的XLSX文件的路径') args = parser.parse_args() print(f"开始读取XLSX文件: {args.file_path}") df = read_xlsx_file(args.file_path) if df is not None and not df.empty: print(f"成功读取数据,共 {len(df)} 行") print("正在预处理数据...") processed_df = preprocess_data(df) print("正在分析数据...") results = analyze_data(processed_df) print("正在生成可视化图表...") visualize_data(processed_df, results) print("正在生成分析报告...") generate_report(results) print("分析完成!") else: print("没有数据可供分析") if __name__ == "__main__": main()