| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 |
- import os
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- import glob
- import json
- import re
- from datetime import datetime
- # 配置matplotlib中文字体显示
- plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans'] # 用来正常显示中文标签
- plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
- def extract_timestamp(timestamp_str):
- """从时间戳字符串中提取日期时间"""
- if isinstance(timestamp_str, tuple):
- timestamp_str = timestamp_str[0]
-
- # 使用正则表达式匹配日期时间格式
- match = re.search(r'(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})', timestamp_str)
- if match:
- return match.group(1)
- return timestamp_str
- def parse_state_flow(state_flow_str):
- """解析状态流字符串为Python对象"""
- if isinstance(state_flow_str, str):
- try:
- # 替换单引号为双引号以便JSON解析
- state_flow_str = state_flow_str.replace("'", '"')
- return json.loads(state_flow_str)
- except json.JSONDecodeError:
- # 如果无法解析为JSON,尝试使用eval(注意:在生产环境中应避免使用eval)
- try:
- return eval(state_flow_str)
- except:
- return state_flow_str
- return state_flow_str
- def read_xlsx_files(directory='xlsx'):
- """读取目录中的所有xlsx文件并合并为一个DataFrame"""
- # 获取目录中所有xlsx文件的路径
- xlsx_files = glob.glob(os.path.join(directory, '*.xlsx'))
-
- # 排除临时文件
- xlsx_files = [f for f in xlsx_files if not os.path.basename(f).startswith('~$')]
-
- if not xlsx_files:
- print(f"在 {directory} 目录中未找到xlsx文件")
- return None
-
- # 读取并合并所有文件
- dfs = []
- for file in xlsx_files:
- try:
- print(f"正在读取文件: {file}")
- df = pd.read_excel(file)
- dfs.append(df)
- except Exception as e:
- print(f"读取文件 {file} 时出错: {e}")
-
- if not dfs:
- print("没有成功读取任何文件")
- return None
-
- # 合并所有DataFrame
- combined_df = pd.concat(dfs, ignore_index=True)
-
- # 设置列名
- column_names = [
- 'pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
- 'symbol', 'exchangeOutAmount', 'strategy', 'queryPriceUrl',
- 'id', 'profit', 'creationTime', 'stateFlow', 'currentState'
- ]
-
- # 如果列数不匹配,进行调整
- if len(combined_df.columns) == len(column_names):
- combined_df.columns = column_names
- else:
- print(f"警告: 列数不匹配。文件列数: {len(combined_df.columns)}, 预期列数: {len(column_names)}")
- # 尝试使用前N列
- combined_df.columns = column_names[:len(combined_df.columns)]
-
- return combined_df
- def read_xlsx_file(file_path):
- """读取指定的xlsx文件并返回一个DataFrame"""
- if not os.path.exists(file_path):
- print(f"文件不存在: {file_path}")
- return None
-
- try:
- print(f"正在读取文件: {file_path}")
- df = pd.read_excel(file_path)
- except Exception as e:
- print(f"读取文件 {file_path} 时出错: {e}")
- return None
-
- # 设置列名
- column_names = [
- 'pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
- 'symbol', 'exchangeOutAmount', 'strategy', 'queryPriceUrl',
- 'id', 'profit', 'creationTime', 'stateFlow', 'currentState'
- ]
-
- # 如果列数不匹配,进行调整
- if len(df.columns) == len(column_names):
- df.columns = column_names
- else:
- print(f"警告: 列数不匹配。文件列数: {len(df.columns)}, 预期列数: {len(column_names)}")
- # 尝试使用前N列
- df.columns = column_names[:len(df.columns)]
-
- return df
- def preprocess_data(df):
- """数据预处理"""
- if df is None or df.empty:
- print("没有数据可供处理")
- return None
-
- # 创建副本避免警告
- processed_df = df.copy()
-
- # 转换数值列
- numeric_columns = ['pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
- 'exchangeOutAmount', 'profit']
-
- for col in numeric_columns:
- if col in processed_df.columns:
- processed_df[col] = pd.to_numeric(processed_df[col], errors='coerce')
-
- # 处理时间戳
- if 'creationTime' in processed_df.columns:
- processed_df['creationTime'] = processed_df['creationTime'].apply(extract_timestamp)
- processed_df['creationTime'] = pd.to_datetime(processed_df['creationTime'], errors='coerce')
-
- # 计算价差百分比
- if all(col in processed_df.columns for col in ['cexPrice', 'dexPrice']):
- processed_df['price_diff_pct'] = (processed_df['cexPrice'] - processed_df['dexPrice']) / processed_df['dexPrice'] * 100
-
- # 计算交易金额
- if all(col in processed_df.columns for col in ['exchangeOutAmount', 'cexPrice']):
- processed_df['trade_value'] = processed_df['exchangeOutAmount'] * processed_df['cexPrice']
-
- return processed_df
- def analyze_data(df):
- """基础数据分析"""
- if df is None or df.empty:
- print("没有数据可供分析")
- return None
-
- results = {}
-
- # 数值列定义(用于其他分析)
- numeric_columns = ['pct', 'openLimit', 'closeLimit', 'cexPrice', 'dexPrice',
- 'exchangeOutAmount', 'profit', 'price_diff_pct', 'trade_value']
-
- # 按交易对分组统计
- if 'symbol' in df.columns:
- symbol_stats = df.groupby('symbol')['profit'].agg(['count', 'sum', 'mean', 'std']).reset_index()
- symbol_stats = symbol_stats.sort_values('sum', ascending=False)
- results['symbol_stats'] = symbol_stats
-
- # 按策略分组统计
- if 'strategy' in df.columns:
- strategy_stats = df.groupby('strategy')['profit'].agg(['count', 'sum', 'mean', 'std']).reset_index()
- strategy_stats = strategy_stats.sort_values('sum', ascending=False)
- results['strategy_stats'] = strategy_stats
-
- # 按状态分组统计
- if 'currentState' in df.columns:
- state_stats = df.groupby('currentState')['profit'].agg(['count', 'sum', 'mean']).reset_index()
- results['state_stats'] = state_stats
-
- # 时间序列分析
- if 'creationTime' in df.columns and pd.api.types.is_datetime64_any_dtype(df['creationTime']):
- # 按日期分组
- df['date'] = df['creationTime'].dt.date
- daily_stats = df.groupby('date')['profit'].agg(['count', 'sum', 'mean']).reset_index()
- results['daily_stats'] = daily_stats
-
- # 按小时分组
- df['hour'] = df['creationTime'].dt.hour
- hourly_stats = df.groupby('hour')['profit'].agg(['count', 'sum', 'mean']).reset_index()
- results['hourly_stats'] = hourly_stats
-
- return results
- def visualize_data(df, results, output_dir='analysis_results'):
- """数据可视化"""
- if df is None or df.empty:
- print("没有数据可供可视化")
- return
-
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
-
- # 1. 利润分布直方图
- if 'profit' in df.columns:
- plt.figure(figsize=(10, 6))
- plt.hist(df['profit'].dropna(), bins=30, alpha=0.7)
- plt.title('利润分布')
- plt.xlabel('利润')
- plt.ylabel('频率')
- plt.grid(True, alpha=0.3)
- plt.savefig(os.path.join(output_dir, 'profit_distribution.png'))
- plt.close()
-
- # 2. 价差百分比与利润的散点图
- if all(col in df.columns for col in ['price_diff_pct', 'profit']):
- plt.figure(figsize=(10, 6))
- plt.scatter(df['price_diff_pct'], df['profit'], alpha=0.5)
- plt.title('价差百分比与利润的关系')
- plt.xlabel('价差百分比 (%)')
- plt.ylabel('利润')
- plt.grid(True, alpha=0.3)
- plt.savefig(os.path.join(output_dir, 'price_diff_vs_profit.png'))
- plt.close()
-
- # 3. 每日利润趋势
- if 'daily_stats' in results:
- daily_stats = results['daily_stats']
- plt.figure(figsize=(12, 6))
- plt.plot(daily_stats['date'], daily_stats['sum'], marker='o', linestyle='-')
- plt.title('每日总利润趋势')
- plt.xlabel('日期')
- plt.ylabel('总利润')
- plt.grid(True, alpha=0.3)
- plt.xticks(rotation=45)
- plt.tight_layout()
- plt.savefig(os.path.join(output_dir, 'daily_profit_trend.png'))
- plt.close()
-
- # 4. 按交易对的利润总和条形图
- if 'symbol_stats' in results:
- symbol_stats = results['symbol_stats']
- plt.figure(figsize=(12, 8))
- plt.barh(symbol_stats['symbol'], symbol_stats['sum'])
- plt.title('各交易对总利润')
- plt.xlabel('总利润')
- plt.ylabel('交易对')
- plt.grid(True, alpha=0.3)
- plt.tight_layout()
- plt.savefig(os.path.join(output_dir, 'symbol_profit.png'))
- plt.close()
-
- # 6. 按小时的平均利润
- if 'hourly_stats' in results:
- hourly_stats = results['hourly_stats']
- plt.figure(figsize=(12, 6))
- plt.bar(hourly_stats['hour'], hourly_stats['mean'])
- plt.title('各小时平均利润')
- plt.xlabel('小时')
- plt.ylabel('平均利润')
- plt.grid(True, alpha=0.3)
- plt.xticks(range(24))
- plt.tight_layout()
- plt.savefig(os.path.join(output_dir, 'hourly_avg_profit.png'))
- plt.close()
- def generate_report(results, output_dir='analysis_results'):
- """生成分析报告"""
- if not results:
- print("没有结果可供生成报告")
- return
-
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
-
- # 生成HTML报告
- html_content = """
- <!DOCTYPE html>
- <html>
- <head>
- <title>交易数据分析报告</title>
- <style>
- body { font-family: Arial, sans-serif; margin: 20px; }
- h1, h2 { color: #333; }
- table { border-collapse: collapse; width: 100%; margin-bottom: 20px; }
- th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
- th { background-color: #f2f2f2; }
- tr:nth-child(even) { background-color: #f9f9f9; }
- .image-container { margin: 20px 0; }
- .image-container img { max-width: 100%; height: auto; }
- </style>
- </head>
- <body>
- <h1>交易数据分析报告</h1>
- <p>生成时间: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """</p>
- """
-
- # 添加交易对统计
- if 'symbol_stats' in results:
- html_content += """
- <h2>交易对统计</h2>
- <table>
- <tr>
- <th>交易对</th>
- <th>交易次数</th>
- <th>总利润</th>
- <th>平均利润</th>
- <th>标准差</th>
- </tr>
- """
-
- for _, row in results['symbol_stats'].iterrows():
- html_content += f"""
- <tr>
- <td>{row['symbol']}</td>
- <td>{row['count']}</td>
- <td>{row['sum']:.4f}</td>
- <td>{row['mean']:.4f}</td>
- <td>{row['std']:.4f}</td>
- </tr>
- """
-
- html_content += "</table>"
-
- # 添加策略统计
- if 'strategy_stats' in results:
- html_content += """
- <h2>策略统计</h2>
- <table>
- <tr>
- <th>策略</th>
- <th>交易次数</th>
- <th>总利润</th>
- <th>平均利润</th>
- <th>标准差</th>
- </tr>
- """
-
- for _, row in results['strategy_stats'].iterrows():
- html_content += f"""
- <tr>
- <td>{row['strategy']}</td>
- <td>{row['count']}</td>
- <td>{row['sum']:.4f}</td>
- <td>{row['mean']:.4f}</td>
- <td>{row['std']:.4f}</td>
- </tr>
- """
-
- html_content += "</table>"
-
- # 添加状态统计
- if 'state_stats' in results:
- html_content += """
- <h2>状态统计</h2>
- <table>
- <tr>
- <th>状态</th>
- <th>交易次数</th>
- <th>总利润</th>
- <th>平均利润</th>
- </tr>
- """
-
- for _, row in results['state_stats'].iterrows():
- html_content += f"""
- <tr>
- <td>{row['currentState']}</td>
- <td>{row['count']}</td>
- <td>{row['sum']:.4f}</td>
- <td>{row['mean']:.4f}</td>
- </tr>
- """
-
- html_content += "</table>"
-
- # 添加每日统计
- if 'daily_stats' in results:
- html_content += """
- <h2>每日统计</h2>
- <table>
- <tr>
- <th>日期</th>
- <th>交易次数</th>
- <th>总利润</th>
- <th>平均利润</th>
- </tr>
- """
-
- for _, row in results['daily_stats'].iterrows():
- html_content += f"""
- <tr>
- <td>{row['date']}</td>
- <td>{row['count']}</td>
- <td>{row['sum']:.4f}</td>
- <td>{row['mean']:.4f}</td>
- </tr>
- """
-
- html_content += "</table>"
-
- # 添加图表
- html_content += """
- <h2>数据可视化</h2>
-
- <div class="image-container">
- <h3>利润分布</h3>
- <img src="profit_distribution.png" alt="利润分布">
- </div>
-
- <div class="image-container">
- <h3>价差百分比与利润的关系</h3>
- <img src="price_diff_vs_profit.png" alt="价差百分比与利润的关系">
- </div>
-
- <div class="image-container">
- <h3>每日总利润趋势</h3>
- <img src="daily_profit_trend.png" alt="每日总利润趋势">
- </div>
-
- <div class="image-container">
- <h3>各交易对总利润</h3>
- <img src="symbol_profit.png" alt="各交易对总利润">
- </div>
-
- <div class="image-container">
- <h3>各小时平均利润</h3>
- <img src="hourly_avg_profit.png" alt="各小时平均利润">
- </div>
- """
-
- # 结束HTML
- html_content += """
- </body>
- </html>
- """
-
- # 写入HTML文件
- with open(os.path.join(output_dir, 'analysis_report.html'), 'w', encoding='utf-8') as f:
- f.write(html_content)
-
- print(f"分析报告已生成: {os.path.join(output_dir, 'analysis_report.html')}")
- def main():
- """主函数"""
- import argparse
- parser = argparse.ArgumentParser(description='交易数据分析脚本')
- parser.add_argument('file_path', type=str, help='要分析的XLSX文件的路径')
- args = parser.parse_args()
- print(f"开始读取XLSX文件: {args.file_path}")
- df = read_xlsx_file(args.file_path)
-
- if df is not None and not df.empty:
- print(f"成功读取数据,共 {len(df)} 行")
-
- print("正在预处理数据...")
- processed_df = preprocess_data(df)
-
- print("正在分析数据...")
- results = analyze_data(processed_df)
-
- print("正在生成可视化图表...")
- visualize_data(processed_df, results)
-
- print("正在生成分析报告...")
- generate_report(results)
-
- print("分析完成!")
- else:
- print("没有数据可供分析")
- if __name__ == "__main__":
- main()
|