try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2) # 等待后重试
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"
sheet_name = sheet_name[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2) # 等待后重试
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"
sheet_name = sheet_name[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return