db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY) # 请求后延迟 1 秒
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
# 定义总体报告字段解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
# field_explanations: dict, 键为字段名,值为中文解释
explanation_row = []
for col in df.columns:
explanation_row.append(field_explanations.get(col, ""))
# 在df上方插入解释行
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 定义通用字段解释(可根据实际情况扩展)
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY) # 请求后延迟 1 秒
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
# 定义总体报告字段解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
# field_explanations: dict, 键为字段名,值为中文解释
explanation_row = []
for col in df.columns:
explanation_row.append(field_explanations.get(col, ""))
# 在df上方插入解释行
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 定义通用字段解释(可根据实际情况扩展)
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}