import pandas as pd
import os
import glob
def read_excel_data(file_path):
"""读取单个 Excel 文件的所有 Sheet,返回合并后的 DataFrame"""
print(f"\n处理文件: {os.path.basename(file_path)}")
if not os.path.exists(file_path):
print(f" 错误: 文件不存在 - {file_path}")
return None
try:
sheets_dict = pd.read_excel(file_path, sheet_name=None, engine='openpyxl')
all_data = []
print(f" 找到 {len(sheets_dict)} 个 Sheet 页")
for sheet_name, df in sheets_dict.items():
df.columns = [str(c).strip() for c in df.columns]
print(f" Sheet '{sheet_name}': {len(df)} 行")
if '测试项' in df.columns and '接口耗时(ms)' in df.columns:
subset = df[['测试项', '接口耗时(ms)']].copy()
all_data.append(subset)
elif df.shape[1] >= 2:
print(f" 警告: 列名不匹配 {df.columns.tolist()},使用前两列")
subset = df.iloc[:, :2].copy()
subset.columns = ['测试项', '接口耗时(ms)']
all_data.append(subset)
else:
print(f" 跳过: 列结构不符合要求")
if all_data:
return pd.concat(all_data, ignore_index=True)
return None
except Exception as e:
print(f" 读取失败: {e}")
return None
def process_multiple_excels(input_dir):
"""处理 input 目录下所有 Excel 文件"""
print(f"扫描目录: {input_dir}")
# 查找所有 xlsx 和 xls 文件
xlsx_files = glob.glob(os.path.join(input_dir, '*.xlsx'))
xls_files = glob.glob(os.path.join(input_dir, '*.xls'))
all_files = xlsx_files + xls_files
if not all_files:
print("未找到任何 Excel 文件!")
return
print(f"找到 {len(all_files)} 个 Excel 文件:")
for f in all_files:
print(f" - {os.path.basename(f)}")
# 收集所有文件的数据
all_data = []
for file_path in all_files:
df = read_excel_data(file_path)
if df is not None:
all_data.append(df)
if not all_data:
print("\n未提取到任何有效数据。")
return
# 合并所有数据
full_df = pd.concat(all_data, ignore_index=True)
print("\n" + "="*50)
print("数据汇总:")
print("="*50)
original_count = len(full_df)
# 数据清洗
full_df['接口耗时(ms)_数值'] = pd.to_numeric(full_df['接口耗时(ms)'], errors='coerce')
invalid_rows = full_df[full_df['接口耗时(ms)'].notna() & full_df['接口耗时(ms)_数值'].isna()]
if not invalid_rows.empty:
print(f"发现 {len(invalid_rows)} 行无效数据 (将被忽略):")
print(invalid_rows[['测试项', '接口耗时(ms)']].to_string(index=False))
clean_df = full_df.dropna(subset=['测试项', '接口耗时(ms)_数值']).copy()
clean_df['接口耗时(ms)'] = clean_df['接口耗时(ms)_数值']
print(f"原始总行数: {original_count}, 清洗后有效行数: {len(clean_df)}")
if clean_df.empty:
print("清洗后无有效数据。")
return
# 计算每个测试项的平均耗时
print("\n" + "="*50)
print("详细计算过程 (Debug):")
print("="*50)
unique_items = clean_df['测试项'].unique()
final_results = []
for item in unique_items:
item_data = clean_df[clean_df['测试项'] == item]['接口耗时(ms)']
values = item_data.tolist()
avg_val = item_data.mean()
sum_val = item_data.sum()
count_val = len(item_data)
print(f"\n测试项: [{item}]")
print(f" - 原始数值列表: {values}")
print(f" - 数据个数: {count_val}")
print(f" - 总耗时: {sum_val}")
print(f" - 平均耗时: {avg_val:.4f}")
final_results.append({
'测试项': item,
'平均接口耗时(ms)': round(avg_val, 2)
})
result = pd.DataFrame(final_results)
print("\n" + "="*50)
print("最终结果摘要:")
print("="*50)
print(result.to_string(index=False))
print("="*50)
# 保存结果(保存到 input 同级目录)
output_file = os.path.join(os.path.dirname(input_dir), 'result.xlsx')
try:
result.to_excel(output_file, index=False)
print(f"\n结果已保存至: {output_file}")
except Exception as e:
print(f"\n保存文件失败: {e}")
if __name__ == "__main__":
current_dir = os.path.dirname(os.path.abspath(__file__))
input_dir = os.path.join(current_dir, 'input')
if not os.path.exists(input_dir):
print(f"错误: input 目录不存在 - {input_dir}")
print("请创建 input 目录并放入 Excel 文件")
else:
process_multiple_excels(input_dir)