return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"请用中文输出分析结果,输出格式为:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,每个字段增加中文解释
"""
# 定义字段中文解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,如身份证号、手机号、密码等。",
"server_analysis": "服务器相关分析结果,包括版本、日志配置、文件权限、UDF 提权风险等信息。",
"access_analysis": "访问记录分析结果,展示访问数据库的服务器信息及潜在风险。"
}
# 构造总体报告数据
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
# 将 value 转换为字符串格式,便于展示
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"请用中文输出分析结果,输出格式为:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,每个字段增加中文解释
"""
# 定义字段中文解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,如身份证号、手机号、密码等。",
"server_analysis": "服务器相关分析结果,包括版本、日志配置、文件权限、UDF 提权风险等信息。",
"access_analysis": "访问记录分析结果,展示访问数据库的服务器信息及潜在风险。"
}
# 构造总体报告数据
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
# 将 value 转换为字符串格式,便于展示
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段