def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel_multiple(overall_data, analysis_result):
"""
导出多个 MySQL 服务器采集的数据到 Excel,包括每个服务器的总体报告、服务器信息、
日志配置、访问记录、文件权限、UDF 信息,以及各数据表及分析结果。
overall_data 为一个字典,包含各服务器采集的信息,结构如下:
{
"db_structures": {conn_id: {数据库: {表: {columns: [...], samples: [...]}, ...}, ...},
"server_infos": {conn_id: {服务器信息}},
"access_logs": {conn_id: [...]},
"log_configs": {conn_id: {...}},
"file_privileges": {conn_id: [...]},
"udf_infos": {conn_id: [...]}
}
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 遍历每个 MySQL 服务器
for conn_id in overall_data["server_infos"]:
# 导出总体报告
export_overall_report(writer, analysis_result, conn_id)
# 导出服务器信息
sheet_name = f"服务器信息_{conn_id}"[:31]
server_df = pd.DataFrame([overall_data["server_infos"][conn_id]])
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置
sheet_name = f"日志配置_{conn_id}"[:31]
log_df = pd.DataFrame(list(overall_data["log_configs"][conn_id].items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if overall_data["access_logs"].get(conn_id):
sheet_name = f"访问记录_{conn_id}"[:31]
access_df = pd.DataFrame(overall_data["access_logs"][conn_id])
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if overall_data["file_privileges"].get(conn_id):
sheet_name = f"文件权限_{conn_id}"[:31]
file_df = pd.DataFrame(overall_data["file_privileges"][conn_id])
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if overall_data["udf_infos"].get(conn_id):
sheet_name = f"UDF信息_{conn_id}"[:31]
udf_df = pd.DataFrame(overall_data["udf_infos"][conn_id])
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
db_structures = overall_data["db_structures"].get(conn_id, {})
for db, tables in db_structures.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(conn_id, {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel_multiple(overall_data, analysis_result):
"""
导出多个 MySQL 服务器采集的数据到 Excel,包括每个服务器的总体报告、服务器信息、
日志配置、访问记录、文件权限、UDF 信息,以及各数据表及分析结果。
overall_data 为一个字典,包含各服务器采集的信息,结构如下:
{
"db_structures": {conn_id: {数据库: {表: {columns: [...], samples: [...]}, ...}, ...},
"server_infos": {conn_id: {服务器信息}},
"access_logs": {conn_id: [...]},
"log_configs": {conn_id: {...}},
"file_privileges": {conn_id: [...]},
"udf_infos": {conn_id: [...]}
}
"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 遍历每个 MySQL 服务器
for conn_id in overall_data["server_infos"]:
# 导出总体报告
export_overall_report(writer, analysis_result, conn_id)
# 导出服务器信息
sheet_name = f"服务器信息_{conn_id}"[:31]
server_df = pd.DataFrame([overall_data["server_infos"][conn_id]])
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置
sheet_name = f"日志配置_{conn_id}"[:31]
log_df = pd.DataFrame(list(overall_data["log_configs"][conn_id].items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if overall_data["access_logs"].get(conn_id):
sheet_name = f"访问记录_{conn_id}"[:31]
access_df = pd.DataFrame(overall_data["access_logs"][conn_id])
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if overall_data["file_privileges"].get(conn_id):
sheet_name = f"文件权限_{conn_id}"[:31]
file_df = pd.DataFrame(overall_data["file_privileges"][conn_id])
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if overall_data["udf_infos"].get(conn_id):
sheet_name = f"UDF信息_{conn_id}"[:31]
udf_df = pd.DataFrame(overall_data["udf_infos"][conn_id])
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
db_structures = overall_data["db_structures"].get(conn_id, {})
for db, tables in db_structures.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(conn_id, {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]