一、常用方法实现
1. 使用 pandas(最常用)
import pandas as pd
import os
import glob
def merge_excel_pandas(input_path, output_file, sheet_name=0):
"""
使用pandas合并多个Excel文件
参数:
input_path: 输入文件路径或通配符模式
output_file: 输出文件路径
sheet_name: 要读取的工作表名称或索引
"""
# 获取所有Excel文件
if os.path.isdir(input_path):
all_files = glob.glob(os.path.join(input_path, "*.xlsx"))
all_files += glob.glob(os.path.join(input_path, "*.xls"))
else:
all_files = glob.glob(input_path)
if not all_files:
print("未找到Excel文件")
return
# 读取并合并所有文件
dfs = []
for file in all_files:
try:
df = pd.read_excel(file, sheet_name=sheet_name)
df['来源文件'] = os.path.basename(file) # 添加来源标记
dfs.append(df)
print(f"已读取: {file},行数: {len(df)}")
except Exception as e:
print(f"读取文件 {file} 时出错: {e}")
if not dfs:
print("没有成功读取任何文件")
return
# 合并所有DataFrame
combined_df = pd.concat(dfs, ignore_index=True)
# 保存到新文件
combined_df.to_excel(output_file, index=False)
print(f"合并完成!总行数: {len(combined_df)},保存到: {output_file}")
return combined_df
# 使用示例
# merge_excel_pandas("./data/*.xlsx", "combined.xlsx")
2. 处理多个工作表的版本
def merge_excel_with_multiple_sheets(input_path, output_file):
"""
合并多个Excel文件的所有工作表
"""
import pandas as pd
from openpyxl import load_workbook
all_files = glob.glob(os.path.join(input_path, "*.xlsx"))
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
sheet_counter = 0
for file in all_files:
try:
# 获取文件中的所有工作表名
wb = load_workbook(file, read_only=True)
sheet_names = wb.sheetnames
wb.close()
# 读取每个工作表
for sheet in sheet_names:
df = pd.read_excel(file, sheet_name=sheet)
if not df.empty:
# 生成新工作表名
new_sheet_name = f"{os.path.basename(file)}_{sheet}"
new_sheet_name = new_sheet_name[:31] # Excel工作表名限制31字符
df.to_excel(writer, sheet_name=new_sheet_name, index=False)
sheet_counter += 1
print(f"已处理: {file} - 工作表: {sheet}")
except Exception as e:
print(f"处理文件 {file} 时出错: {e}")
print(f"合并完成!共处理 {sheet_counter} 个工作表")
3. 使用 openpyxl(保留格式)
from openpyxl import Workbook, load_workbook
def merge_excel_openpyxl(files, output_file):
"""
使用openpyxl合并,保留格式
"""
wb_output = Workbook()
ws_output = wb_output.active
ws_output.title = "Combined"
header_written = False
for file_path in files:
try:
wb = load_workbook(file_path)
ws = wb.active
for row in ws.iter_rows(values_only=True):
if not header_written:
ws_output.append(row)
header_written = True
elif row[0] is not None: # 跳过空行
ws_output.append(row)
wb.close()
print(f"已合并: {file_path}")
except Exception as e:
print(f"处理 {file_path} 时出错: {e}")
wb_output.save(output_file)
print(f"合并完成,保存到: {output_file}")
4. 使用 xlrd/xlwt(老版.xls文件)
def merge_excel_xlrd(files, output_file):
"""
处理老版.xls文件
"""
import xlrd
import xlwt
output_workbook = xlwt.Workbook()
output_sheet = output_workbook.add_sheet('Combined')
row_index = 0
for file_idx, file_path in enumerate(files):
try:
workbook = xlrd.open_workbook(file_path)
sheet = workbook.sheet_by_index(0)
for r in range(sheet.nrows):
for c in range(sheet.ncols):
output_sheet.write(row_index, c, sheet.cell_value(r, c))
row_index += 1
print(f"已处理: {file_path}")
except Exception as e:
print(f"处理 {file_path} 时出错: {e}")
output_workbook.save(output_file)
print(f"合并完成,保存到: {output_file}")
二、高级功能实现
1. 带数据清洗的合并
def merge_with_data_cleaning(input_path, output_file):
"""
合并时进行数据清洗
"""
import pandas as pd
import numpy as np
all_files = glob.glob(os.path.join(input_path, "*.xlsx"))
dfs = []
for file in all_files:
df = pd.read_excel(file)
# 数据清洗示例
# 1. 去除完全空白的行
df = df.dropna(how='all')
# 2. 去除重复的标题行(假设第一列包含特定标题)
if len(df) > 0:
df = df[~df.iloc[:, 0].astype(str).str.contains('标题|header', case=False, na=False)]
# 3. 重置索引
df = df.reset_index(drop=True)
# 4. 添加文件来源
df['Source_File'] = os.path.basename(file)
df['Merge_Time'] = pd.Timestamp.now()
dfs.append(df)
# 合并所有数据
combined = pd.concat(dfs, ignore_index=True)
# 进一步的数据清洗
# 去除重复数据
combined = combined.drop_duplicates()
# 保存
combined.to_excel(output_file, index=False)
# 生成合并报告
report = {
'total_files': len(all_files),
'total_rows': len(combined),
'columns': list(combined.columns),
'data_types': combined.dtypes.to_dict()
}
return combined, report
2. 分批合并大文件
def merge_large_excel_files(files, output_file, chunk_size=10000):
"""
分批读取和合并大文件
"""
import pandas as pd
first_file = True
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
for file in files:
print(f"正在处理: {file}")
# 分批读取
chunks = pd.read_excel(file, chunksize=chunk_size)
for i, chunk in enumerate(chunks):
chunk['来源文件'] = os.path.basename(file)
if first_file:
# 第一次写入包含表头
chunk.to_excel(writer, index=False,
header=True if i == 0 else False)
first_file = False
else:
# 后续追加,不包含表头
chunk.to_excel(writer, index=False,
header=False, startrow=writer.sheets['Sheet1'].max_row)
print("大文件合并完成")
三、方法对比分析
| 特性 |
pandas |
openpyxl |
xlrd/xlwt |
建议使用场景 |
|---|
| 易用性 |
★★★★★ |
★★★☆☆ |
★★☆☆☆ |
快速开发首选pandas |
| 性能 |
★★★★☆ |
★★★☆☆ |
★★☆☆☆ |
大数据量用pandas分批处理 |
| 格式保留 |
★★☆☆☆ |
★★★★★ |
★★★☆☆ |
需要保留格式用openpyxl |
| .xls支持 |
★★★★☆ |
★★☆☆☆ |
★★★★★ |
老.xls文件用xlrd |
| 功能丰富度 |
★★★★★ |
★★★★☆ |
★★☆☆☆ |
复杂数据处理用pandas |
| 内存效率 |
★★★☆☆ |
★★★★☆ |
★★★☆☆ |
超大文件用openpyxl流式读取 |
四、最佳实践建议
1. 推荐方案选择
def smart_merge_excel(files, output_path, preserve_format=False, large_file=False):
"""
智能选择合并方案
"""
if preserve_format:
# 需要保留格式
print("使用openpyxl合并(保留格式)")
merge_excel_openpyxl(files, output_path)
elif large_file:
# 处理大文件
print("使用pandas分批处理大文件")
merge_large_excel_files(files, output_path)
elif any(f.endswith('.xls') for f in files):
# 包含老版.xls文件
print("使用pandas(兼容.xls和.xlsx)")
merge_excel_pandas(files, output_path)
else:
# 默认使用pandas
print("使用pandas合并")
merge_excel_pandas(files, output_path)
2. 完整工具类示例
class ExcelMerger:
"""Excel合并工具类"""
def __init__(self, engine='auto'):
"""
初始化合并器
engine: 'auto', 'pandas', 'openpyxl', 'xlrd'
"""
self.engine = engine
def merge(self, input_pattern, output_file, **kwargs):
"""主合并方法"""
# 获取文件列表
files = glob.glob(input_pattern)
if not files:
raise ValueError(f"未找到匹配的文件: {input_pattern}")
# 自动选择引擎
if self.engine == 'auto':
engine = self._auto_select_engine(files, kwargs)
else:
engine = self.engine
# 执行合并
if engine == 'pandas':
return self._merge_with_pandas(files, output_file, **kwargs)
elif engine == 'openpyxl':
return self._merge_with_openpyxl(files, output_file, **kwargs)
else:
raise ValueError(f"不支持的引擎: {engine}")
def _auto_select_engine(self, files, kwargs):
"""自动选择最合适的引擎"""
# 检查是否需要保留格式
if kwargs.get('preserve_format', False):
return 'openpyxl'
# 检查是否包含.xls文件
if any(f.endswith('.xls') for f in files):
return 'pandas' # pandas支持.xls
# 默认使用pandas
return 'pandas'
def _merge_with_pandas(self, files, output_file, **kwargs):
"""使用pandas合并"""
dfs = []
for file in files:
df = pd.read_excel(file, **kwargs)
df['source_file'] = os.path.basename(file)
dfs.append(df)
combined = pd.concat(dfs, ignore_index=True)
combined.to_excel(output_file, index=False)
return {
'status': 'success',
'file_count': len(files),
'total_rows': len(combined),
'output_file': output_file
}
# 使用示例
if __name__ == "__main__":
# 简单合并
merger = ExcelMerger()
result = merger.merge("./data/*.xlsx", "output/combined.xlsx")
print(result)
# 保留格式合并
merger2 = ExcelMerger(engine='openpyxl')
result2 = merger2.merge("./data/*.xlsx", "output/formatted.xlsx",
preserve_format=True)
五、性能优化建议
大文件处理:使用
chunksize参数分批读取
内存管理:及时删除不再需要的DataFrame
并行处理:对于大量小文件可以使用多线程
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
def parallel_merge_excel(files, output_file, max_workers=4):
"""并行读取Excel文件"""
def read_file(file):
return pd.read_excel(file)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
dfs = list(executor.map(read_file, files))
combined = pd.concat(dfs, ignore_index=True)
combined.to_excel(output_file, index=False)
return combined
总结
- 简单场景:直接使用pandas的
pd.concat()方法
- 需要保留格式:使用openpyxl
- 处理大文件:使用分批读取策略
- 老版.xls文件:使用pandas或xlrd
- 生产环境:建议封装成工具类,加入错误处理和日志记录
根据具体需求选择合适的工具,通常pandas是最通用的选择,而openpyxl适合需要精细控制格式的场景。