"数据库": db,
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]
explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合",
"总体用途": "每个数据库支撑的 Web 服务总体用途及服务器业务支持情况"
}
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
export_overall_report(writer, db_summary, server_overall)
server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)
log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {"配置项": "日志相关配置项", "值": "对应配置项的值"}
export_field_explanations(writer, "日志配置", log_df, log_explanations)
if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)
for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()
"敏感字段汇总": ", ".join(info.get("all_sensitive_fields", [])),
"总体用途": info.get("database_usage", "")
})
overall_df = pd.DataFrame(rows, columns=["数据库", "敏感字段汇总", "总体用途"])
overall_df.loc[len(overall_df)] = ["MySQL服务器", "", server_overall.get("server_overall", "")]
explanations = {
"数据库": "各数据库名称(通常对应不同的 Web 服务)",
"敏感字段汇总": "各数据库中所有被识别出的敏感字段集合",
"总体用途": "每个数据库支撑的 Web 服务总体用途及服务器业务支持情况"
}
explanation_row = [explanations.get(col, "") for col in overall_df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=overall_df.columns)
combined_df = pd.concat([explanation_df, overall_df], ignore_index=True)
combined_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, db_summary, table_analysis, server_info, access_logs, log_configs, file_privileges, udf_info, server_overall):
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
export_overall_report(writer, db_summary, server_overall)
server_df = pd.DataFrame([server_info])
common_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
export_field_explanations(writer, "服务器信息", server_df, common_explanations)
log_df = pd.DataFrame(list(log_configs.items()), columns=["配置项", "值"])
log_explanations = {"配置项": "日志相关配置项", "值": "对应配置项的值"}
export_field_explanations(writer, "日志配置", log_df, log_explanations)
if access_logs:
access_df = pd.DataFrame(access_logs)
access_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, "访问记录", access_df, access_explanations)
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, "文件权限", file_df, file_explanations)
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, "UDF信息", udf_df, udf_explanations)
for db, tables in db_structure.items():
for table, content in tables.items():
data = content["samples"]
columns = content["columns"]
df = pd.DataFrame(data, columns=columns)
table_result = table_analysis.get(db, {}).get(table, {})
sensitive_fields = table_result.get("sensitive_fields", [])
sensitive_marks = ["敏感字段" if col in sensitive_fields else "" for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
conn.close()