try:
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析函数 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试和请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_table(db, table, table_data, server_info):
"""
分析单个数据表(第一阶段分析),利用大模型分析表中的敏感信息和潜在风险,
并判断该表是否与某个 Web 服务相关。
返回格式:{ 'sensitive_fields': [...], 'table_usage': '可能用途说明' }
"""
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result
def analyze_tables(db_structure, server_info):
"""
对每个数据表进行分析(第一阶段),返回结果:
{ 数据库: { 表: 分析结果, ... }, ... }
"""
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis
def summarize_database(db, table_analysis):
"""
对一个数据库内的所有数据表的分析结果进行汇总(第二阶段),调用大模型进一步分析,
以生成该数据库(对应一个 Web 服务)的总体敏感信息和用途描述。
返回格式:{ 'all_sensitive_fields': [...], 'database_usage': '总体用途说明' }
"""
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result
def summarize_all_databases(db_structure, table_analysis):
"""
对每个数据库进行第二阶段的汇总分析,
返回结果格式:
{ 数据库: { 'all_sensitive_fields': [...], 'database_usage': '总体用途说明', 'tables': { ... } } }
"""
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary
def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析函数 ======
def call_openai_api(prompt):
"""调用 OpenAI API,增加重试和请求延迟"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全与业务分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败 (尝试 {attempt+1}/{MAX_RETRY}):{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_table(db, table, table_data, server_info):
"""
分析单个数据表(第一阶段分析),利用大模型分析表中的敏感信息和潜在风险,
并判断该表是否与某个 Web 服务相关。
返回格式:{ 'sensitive_fields': [...], 'table_usage': '可能用途说明' }
"""
prompt = (
f"请基于下面提供的 MySQL 数据库【{db}】中表【{table}】的结构和样本数据,"
"识别可能的敏感信息(如身份证号、手机号、邮箱、密码等)和潜在安全风险,"
"同时判断该表是否支撑某个 Web 服务,并说明该服务可能的用途(例如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'sensitive_fields': [敏感字段, ...], 'table_usage': '可能用途说明' }\n"
"数据如下:\n" + json.dumps(table_data, ensure_ascii=False, indent=2) +
"\n服务器信息如下:\n" + json.dumps(server_info, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"分析 {db}.{table} 失败,响应:{response}")
result = {}
return result
def analyze_tables(db_structure, server_info):
"""
对每个数据表进行分析(第一阶段),返回结果:
{ 数据库: { 表: 分析结果, ... }, ... }
"""
table_analysis = {}
for db, tables in db_structure.items():
table_analysis[db] = {}
for table, data in tables.items():
logging.info(f"正在分析表 {db}.{table} ...")
result = analyze_table(db, table, data, server_info)
table_analysis[db][table] = result
return table_analysis
def summarize_database(db, table_analysis):
"""
对一个数据库内的所有数据表的分析结果进行汇总(第二阶段),调用大模型进一步分析,
以生成该数据库(对应一个 Web 服务)的总体敏感信息和用途描述。
返回格式:{ 'all_sensitive_fields': [...], 'database_usage': '总体用途说明' }
"""
prompt = (
f"请基于下面提供的数据库【{db}】中各数据表的分析结果,"
"汇总出该数据库中所有被识别出的敏感字段,并判断该数据库支撑的 Web 服务可能的用途(如电商、内容发布、监控等)。\n"
"请用中文输出分析结果,格式为:\n"
"{ 'all_sensitive_fields': [敏感字段, ...], 'database_usage': '总体用途说明' }\n"
"数据如下:\n" + json.dumps(table_analysis, ensure_ascii=False, indent=2)
)
response = call_openai_api(prompt)
try:
result = json.loads(response)
except json.JSONDecodeError:
logging.error(f"数据库 {db} 汇总分析失败,响应:{response}")
result = {}
return result
def summarize_all_databases(db_structure, table_analysis):
"""
对每个数据库进行第二阶段的汇总分析,
返回结果格式:
{ 数据库: { 'all_sensitive_fields': [...], 'database_usage': '总体用途说明', 'tables': { ... } } }
"""
summary = {}
for db in db_structure.keys():
logging.info(f"正在汇总分析数据库 {db} ...")
summary[db] = summarize_database(db, table_analysis.get(db, {}))
summary[db]["tables"] = table_analysis.get(db, {})
return summary
def analyze_server_overall(server_info, db_summary, access_logs, log_configs, file_privileges, udf_info):