try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
try:
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试机制及请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""
使用 OpenAI 分析所有 MySQL 采集的数据,识别敏感信息和潜在渗透风险,
并分析各服务器是否支撑某类 Web 服务及其可能用途。
"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的多个 MySQL 服务器的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据判断敏感信息。"
"\n另外,请分析每个 MySQL 服务器是否支撑某个 Web 服务,以及该 Web 服务器可能的功能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {MySQL服务器标识: {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {MySQL服务器标识: 服务器相关风险描述及 Web 服务功能说明, ...},\n"
" 'access_analysis': {MySQL服务器标识: 访问记录相关风险描述, ...}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result, conn_id):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,可能泄露个人隐私。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务功能及可能用途。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {}).get(conn_id, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
sheet_name = f"总体报告_{conn_id}"[:31]
report_df.to_excel(writer, sheet_name=sheet_name, index=False)
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取 {db}.{table} 字段信息失败: {e}")
continue
try:
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试机制及请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""
使用 OpenAI 分析所有 MySQL 采集的数据,识别敏感信息和潜在渗透风险,
并分析各服务器是否支撑某类 Web 服务及其可能用途。
"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的多个 MySQL 服务器的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据判断敏感信息。"
"\n另外,请分析每个 MySQL 服务器是否支撑某个 Web 服务,以及该 Web 服务器可能的功能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {MySQL服务器标识: {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {MySQL服务器标识: 服务器相关风险描述及 Web 服务功能说明, ...},\n"
" 'access_analysis': {MySQL服务器标识: 访问记录相关风险描述, ...}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result, conn_id):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,可能泄露个人隐私。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务功能及可能用途。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {}).get(conn_id, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
sheet_name = f"总体报告_{conn_id}"[:31]
report_df.to_excel(writer, sheet_name=sheet_name, index=False)