BroadcastChannel
https://channel.gandli.eu.org/
https://channel.gandli.eu.org/
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构、服务器信息及其作为 Web 服务器的功能和内容"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。"
"\n另外,请分析这些数据是否用于支撑某个 Web 服务器的功能或内容,并说明该 Web 服务器的可能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述及 Web 服务器功能说明},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务器的功能或内容。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构、服务器信息及其作为 Web 服务器的功能和内容"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。"
"\n另外,请分析这些数据是否用于支撑某个 Web 服务器的功能或内容,并说明该 Web 服务器的可能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述及 Web 服务器功能说明},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务器的功能或内容。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
import os
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
操作指南:
1. 配置多数据库连接
- 创建
- 对敏感凭证进行Fernet加密:
python
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
encrypted_pwd = cipher.encrypt(b"real_password").decode()
2. 执行审计扫描
bash
export DECRYPTION_KEY="your_key"
python audit_tool.py
3. 查看输出报告**
- 自动生成
- 包含字段加密分析和GPT-4生成的渗透建议
该方案通过模块化设计支持企业级数据库审计需求,结合规则引擎与AI分析实现深度安全检测。
1. 配置多数据库连接
- 创建
multi_db_config.json文件- 对敏感凭证进行Fernet加密:
python
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
encrypted_pwd = cipher.encrypt(b"real_password").decode()
2. 执行审计扫描
bash
export DECRYPTION_KEY="your_key"
python audit_tool.py
3. 查看输出报告**
- 自动生成
report_<timestamp>.md- 包含字段加密分析和GPT-4生成的渗透建议
该方案通过模块化设计支持企业级数据库审计需求,结合规则引擎与AI分析实现深度安全检测。
def connect_all(self):
"""建立所有数据库连接"""
connections =
for cfg in self.configs:
try:
conn = pymysql.connect(cfg)
connections.append(conn)
logging.info(f"成功连接数据库:{cfg'host'}:{cfg'port'}")
except Exception as e:
logging.error(f"连接失败 {cfg'host'}:{str(e)}")
return connections
====== 增强型数据采集 ======
def enhanced_collect(conn) -> Dict:
"""增强型数据采集(包含密码字段检测)"""
data = {}
with conn.cursor() as cursor:
获取所有数据库
cursor.execute("SHOW DATABASES")
databases = db0 for db in cursor.fetchall()
for db in databases:
if db in ('mysql', 'sys'): continue
datadb = {}
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = tbl0 for tbl in cursor.fetchall()
for tbl in tables:
获取表结构
cursor.execute(f"DESCRIBE
columns = col0 for col in cursor.fetchall()
检测敏感字段
sensitive_cols =
for col in columns:
if re.search(r'passw(or)?dpwdhashsalt', col, re.I):
sensitive_cols.append(col)
获取样本并分析加密
crypto_analysis = {}
if sensitive_cols:
cursor.execute(f"SELECT {','.join(sensitive_cols)} FROM
samples = cursor.fetchall()
for idx, col in enumerate(sensitive_cols):
sample = samples0idx if samples else ''
crypto_types = CryptoDetector.detect_crypto_type(sample)
crypto_analysiscol = {
'types': crypto_types,
'advice': CryptoDetector.get_decrypt_advice(crypto_types)
}
datadbtbl = {
'columns': columns,
'sensitive': crypto_analysis
}
except Exception as e:
logging.error(f"采集失败 {db}.{tbl}:{str(e)}")
return data
====== 改进后的主流程 ======
def main():
初始化多数据库连接
db_mgr = DBConnectionManager("multi_db_config.json")
connections = db_mgr.connect_all()
all_reports =
for conn in connections:
try:
执行增强采集
report = enhanced_collect(conn)
all_reports.append(report)
生成渗透建议报告
analysis_prompt = generate_analysis_prompt(report)
ai_advice = openai.ChatCompletion.create(
model="gpt-4",
messages={"role": "user", "content": analysis_prompt}
)
保存结果到Markdown
save_markdown_report(report, ai_advice)
finally:
conn.close()
合并所有报告
merge_reports(all_reports)
核心改进说明:
1. 多数据库支持
json
// multi_db_config.json
{
"host": "db1.example.com",
"user": "audit_user",
"password": "gAAAAABk...(加密凭证)",
"encrypted": true,
"port": 3306
},
{
"host": "db2.example.com",
"user": "readonly_user",
"password": "plaintext_pass",
"port": 3307
}
2. 加密类型识别逻辑
python
输入样本检测示例
sample = "5f4dcc3b5aa765d61d8327deb882cf99" MD5
CryptoDetector.detect_crypto_type(sample)
输出: {'md5': 0.9}
解密建议生成
advice = CryptoDetector.get_decrypt_advice({'md5': 0.9})
print(advice)
输出: "MD5哈希:使用彩虹表碰撞(如hashcat -m 0)或在线解密网站"
3. 渗透建议报告示例
markdown
安全审计报告 - 192.168.1.100
敏感字段分析
数据库 表 字段 加密类型 渗透建议
user_db accounts password bcrypt 需GPU集群暴力破解(约$1/10亿次)
order_db transactions card_token AES 查找配置文件中的AES密钥
AI建议
尝试以下渗透路径:
1. 从web应用的/config目录查找aes_key.txt
2. 使用JohnTheRipper进行bcrypt爆破
3. 检查数据库备份文件中的测试账号
"""建立所有数据库连接"""
connections =
for cfg in self.configs:
try:
conn = pymysql.connect(cfg)
connections.append(conn)
logging.info(f"成功连接数据库:{cfg'host'}:{cfg'port'}")
except Exception as e:
logging.error(f"连接失败 {cfg'host'}:{str(e)}")
return connections
====== 增强型数据采集 ======
def enhanced_collect(conn) -> Dict:
"""增强型数据采集(包含密码字段检测)"""
data = {}
with conn.cursor() as cursor:
获取所有数据库
cursor.execute("SHOW DATABASES")
databases = db0 for db in cursor.fetchall()
for db in databases:
if db in ('mysql', 'sys'): continue
datadb = {}
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = tbl0 for tbl in cursor.fetchall()
for tbl in tables:
获取表结构
cursor.execute(f"DESCRIBE
{tbl}")columns = col0 for col in cursor.fetchall()
检测敏感字段
sensitive_cols =
for col in columns:
if re.search(r'passw(or)?dpwdhashsalt', col, re.I):
sensitive_cols.append(col)
获取样本并分析加密
crypto_analysis = {}
if sensitive_cols:
cursor.execute(f"SELECT {','.join(sensitive_cols)} FROM
{tbl} LIMIT 5")samples = cursor.fetchall()
for idx, col in enumerate(sensitive_cols):
sample = samples0idx if samples else ''
crypto_types = CryptoDetector.detect_crypto_type(sample)
crypto_analysiscol = {
'types': crypto_types,
'advice': CryptoDetector.get_decrypt_advice(crypto_types)
}
datadbtbl = {
'columns': columns,
'sensitive': crypto_analysis
}
except Exception as e:
logging.error(f"采集失败 {db}.{tbl}:{str(e)}")
return data
====== 改进后的主流程 ======
def main():
初始化多数据库连接
db_mgr = DBConnectionManager("multi_db_config.json")
connections = db_mgr.connect_all()
all_reports =
for conn in connections:
try:
执行增强采集
report = enhanced_collect(conn)
all_reports.append(report)
生成渗透建议报告
analysis_prompt = generate_analysis_prompt(report)
ai_advice = openai.ChatCompletion.create(
model="gpt-4",
messages={"role": "user", "content": analysis_prompt}
)
保存结果到Markdown
save_markdown_report(report, ai_advice)
finally:
conn.close()
合并所有报告
merge_reports(all_reports)
核心改进说明:
1. 多数据库支持
json
// multi_db_config.json
{
"host": "db1.example.com",
"user": "audit_user",
"password": "gAAAAABk...(加密凭证)",
"encrypted": true,
"port": 3306
},
{
"host": "db2.example.com",
"user": "readonly_user",
"password": "plaintext_pass",
"port": 3307
}
2. 加密类型识别逻辑
python
输入样本检测示例
sample = "5f4dcc3b5aa765d61d8327deb882cf99" MD5
CryptoDetector.detect_crypto_type(sample)
输出: {'md5': 0.9}
解密建议生成
advice = CryptoDetector.get_decrypt_advice({'md5': 0.9})
print(advice)
输出: "MD5哈希:使用彩虹表碰撞(如hashcat -m 0)或在线解密网站"
3. 渗透建议报告示例
markdown
安全审计报告 - 192.168.1.100
敏感字段分析
数据库 表 字段 加密类型 渗透建议
user_db accounts password bcrypt 需GPU集群暴力破解(约$1/10亿次)
order_db transactions card_token AES 查找配置文件中的AES密钥
AI建议
尝试以下渗透路径:
1. 从web应用的/config目录查找aes_key.txt
2. 使用JohnTheRipper进行bcrypt爆破
3. 检查数据库备份文件中的测试账号
好的,我现在需要帮用户解决他们的问题,即如何修改现有的Python脚本,使其支持多个MySQL连接,并且在遇到密文存储的密码时,能够给出类型判断和解密建议。首先,我得仔细理解用户的需求。
用户当前的脚本只能连接单个数据库,他们希望扩展到多个连接。这意味着需要在配置文件中支持多个数据库配置,比如通过JSON或YAML格式,每个数据库有自己的连接参数。然后,脚本需要遍历这些配置,逐一连接并采集数据。这涉及到修改数据采集部分,使其循环处理每个数据库连接,并可能将结果合并或分别处理。
接下来是关于密文密码的处理。用户提到当遇到密文存储的密码时,需要判断其加密类型,并提供解密建议。常见的加密方式包括哈希(如MD5、SHA-1、bcrypt)、对称加密(如AES、DES)和非对称加密(如RSA)。此外,有些系统可能使用自定义的加密方法或组合多种方式。
首先,我需要考虑如何检测密码字段。可能的方法包括字段名分析(如字段名包含“password”、“pwd”、“hash”等)、数据模式匹配(如MD5哈希是32位十六进制,bcrypt以$2a$开头),以及样本数据分析(如数据是否符合特定加密模式)。
然后,针对检测到的加密类型,给出解密建议。例如,如果是MD5哈希,建议使用彩虹表碰撞或暴力破解;如果是bcrypt,说明其安全性较高,破解难度大;如果是AES加密,则需要获取密钥和IV才能解密。此外,还要考虑是否有现成的工具或库可以辅助解密,比如hashcat对于哈希破解,或OpenSSL用于对称加密解密。
在实现上,可能需要一个加密检测模块,该模块接收字段名和样本数据,返回可能的加密类型和置信度。然后,根据检测结果生成建议。这部分可能需要集成一些正则表达式匹配、特征字符串识别,甚至调用外部库进行深度分析。
同时,用户要求支持多个MySQL连接,这意味着需要重构现有的数据库连接逻辑,从单一的连接处理改为循环处理多个配置。这可能涉及到配置文件的解析,错误处理(如某个数据库连接失败时继续处理下一个),以及结果汇总。
此外,还需要考虑如何将不同数据库的分析结果整合到最终的Excel报告中,可能需要为每个数据库创建单独的工作表,或者在总体报告中区分来源。
在代码结构上,可能需要将数据库连接和数据采集部分封装成函数,便于循环调用。加密检测和解密建议部分可以作为独立的函数或类,供数据采集过程中调用。
测试方面,需要确保多数据库配置的正确性,处理各种可能的异常情况,如无效的配置项、连接超时等。同时,加密检测的准确性也需要通过测试用例验证,比如不同加密类型的样本数据是否能被正确识别。
最后,用户可能还希望脚本能够自动化处理尽可能多的情况,减少手动干预,因此需要详细的日志记录和错误报告,方便用户查看哪些数据库处理成功,哪些失败,以及失败的原因。
总结来说,我需要分步骤处理:首先修改配置处理以支持多数据库,然后调整数据采集逻辑,接着实现加密检测与建议功能,最后整合所有内容并确保输出报告的正确性。同时,代码需要保持可维护性和扩展性,方便未来可能的修改或功能添加。
以下是为增强数据库安全审计工具而设计的改进方案,包含多数据库连接支持与密码加密分析功能:
python
import os
import pymysql
import openai
import json
import pandas as pd
import re
from typing import List, Dict
from cryptography.fernet import Fernet
====== 密码加密模式识别库 ======
class CryptoDetector:
"""密码加密模式识别与解密建议"""
HASH_PATTERNS = {
'md5': re.compile(r'^a-fA-F0-9{32}$'),
'sha1': re.compile(r'^a-fA-F0-9{40}$'),
'bcrypt': re.compile(r'^\$2aby?\$0-9+\$a-zA-Z0-9./+$'),
'pbkdf2': re.compile(r'^\$pbkdf2-.+$')
}
CIPHER_PATTERNS = {
'aes': re.compile(r'^A-Za-z0-9+/={24,}$'),
'base64': re.compile(r'^A-Za-z0-9+/=+$')
}
@classmethod
def detect_crypto_type(cls, sample: str) -> Dict:
"""检测加密类型及置信度"""
results = {}
哈希类型检测
for algo, pattern in cls.HASH_PATTERNS.items():
if pattern.match(str(sample)):
resultsalgo = 0.9 置信度评分
加密类型检测
for cipher, pattern in cls.CIPHER_PATTERNS.items():
if pattern.match(str(sample)):
resultscipher = 0.7
特殊前缀识别
if str(sample).startswith('{ENC}'):
results'custom' = 0.8
return results
@classmethod
def get_decrypt_advice(cls, crypto_types: Dict) -> str:
"""生成解密建议"""
advice =
for algo, score in sorted(crypto_types.items(), key=lambda x: -x1):
if algo == 'md5':
advice.append(f"MD5哈希:使用彩虹表碰撞(如hashcat -m 0)或在线解密网站")
elif algo == 'bcrypt':
advice.append(f"Bcrypt哈希:需GPU集群暴力破解(约$1/10亿次)")
elif algo == 'aes':
advice.append(f"AES加密:需获取密钥和IV(检查配置文件或内存dump)")
elif algo == 'custom':
advice.append(f"自定义加密:逆向工程查找加解密函数")
return "\n".join(advice) if advice else "无法识别加密类型"
====== 多数据库连接管理 ======
class DBConnectionManager:
"""多数据库连接处理器"""
def init(self, config_path: str = "db_config.json"):
self.configs = self.load_configs(config_path)
@staticmethod
def load_configs(path: str) -> ListDict:
"""加载多数据库配置"""
with open(path) as f:
configs = json.load(f)
配置解密示例(需根据实际情况修改)
for cfg in configs:
if cfg.get('encrypted'):
key = os.getenv('DECRYPTION_KEY').encode()
cipher = Fernet(key)
cfg'password' = cipher.decrypt(cfg'password'.encode()).decode()
return configs
用户当前的脚本只能连接单个数据库,他们希望扩展到多个连接。这意味着需要在配置文件中支持多个数据库配置,比如通过JSON或YAML格式,每个数据库有自己的连接参数。然后,脚本需要遍历这些配置,逐一连接并采集数据。这涉及到修改数据采集部分,使其循环处理每个数据库连接,并可能将结果合并或分别处理。
接下来是关于密文密码的处理。用户提到当遇到密文存储的密码时,需要判断其加密类型,并提供解密建议。常见的加密方式包括哈希(如MD5、SHA-1、bcrypt)、对称加密(如AES、DES)和非对称加密(如RSA)。此外,有些系统可能使用自定义的加密方法或组合多种方式。
首先,我需要考虑如何检测密码字段。可能的方法包括字段名分析(如字段名包含“password”、“pwd”、“hash”等)、数据模式匹配(如MD5哈希是32位十六进制,bcrypt以$2a$开头),以及样本数据分析(如数据是否符合特定加密模式)。
然后,针对检测到的加密类型,给出解密建议。例如,如果是MD5哈希,建议使用彩虹表碰撞或暴力破解;如果是bcrypt,说明其安全性较高,破解难度大;如果是AES加密,则需要获取密钥和IV才能解密。此外,还要考虑是否有现成的工具或库可以辅助解密,比如hashcat对于哈希破解,或OpenSSL用于对称加密解密。
在实现上,可能需要一个加密检测模块,该模块接收字段名和样本数据,返回可能的加密类型和置信度。然后,根据检测结果生成建议。这部分可能需要集成一些正则表达式匹配、特征字符串识别,甚至调用外部库进行深度分析。
同时,用户要求支持多个MySQL连接,这意味着需要重构现有的数据库连接逻辑,从单一的连接处理改为循环处理多个配置。这可能涉及到配置文件的解析,错误处理(如某个数据库连接失败时继续处理下一个),以及结果汇总。
此外,还需要考虑如何将不同数据库的分析结果整合到最终的Excel报告中,可能需要为每个数据库创建单独的工作表,或者在总体报告中区分来源。
在代码结构上,可能需要将数据库连接和数据采集部分封装成函数,便于循环调用。加密检测和解密建议部分可以作为独立的函数或类,供数据采集过程中调用。
测试方面,需要确保多数据库配置的正确性,处理各种可能的异常情况,如无效的配置项、连接超时等。同时,加密检测的准确性也需要通过测试用例验证,比如不同加密类型的样本数据是否能被正确识别。
最后,用户可能还希望脚本能够自动化处理尽可能多的情况,减少手动干预,因此需要详细的日志记录和错误报告,方便用户查看哪些数据库处理成功,哪些失败,以及失败的原因。
总结来说,我需要分步骤处理:首先修改配置处理以支持多数据库,然后调整数据采集逻辑,接着实现加密检测与建议功能,最后整合所有内容并确保输出报告的正确性。同时,代码需要保持可维护性和扩展性,方便未来可能的修改或功能添加。
以下是为增强数据库安全审计工具而设计的改进方案,包含多数据库连接支持与密码加密分析功能:
python
import os
import pymysql
import openai
import json
import pandas as pd
import re
from typing import List, Dict
from cryptography.fernet import Fernet
====== 密码加密模式识别库 ======
class CryptoDetector:
"""密码加密模式识别与解密建议"""
HASH_PATTERNS = {
'md5': re.compile(r'^a-fA-F0-9{32}$'),
'sha1': re.compile(r'^a-fA-F0-9{40}$'),
'bcrypt': re.compile(r'^\$2aby?\$0-9+\$a-zA-Z0-9./+$'),
'pbkdf2': re.compile(r'^\$pbkdf2-.+$')
}
CIPHER_PATTERNS = {
'aes': re.compile(r'^A-Za-z0-9+/={24,}$'),
'base64': re.compile(r'^A-Za-z0-9+/=+$')
}
@classmethod
def detect_crypto_type(cls, sample: str) -> Dict:
"""检测加密类型及置信度"""
results = {}
哈希类型检测
for algo, pattern in cls.HASH_PATTERNS.items():
if pattern.match(str(sample)):
resultsalgo = 0.9 置信度评分
加密类型检测
for cipher, pattern in cls.CIPHER_PATTERNS.items():
if pattern.match(str(sample)):
resultscipher = 0.7
特殊前缀识别
if str(sample).startswith('{ENC}'):
results'custom' = 0.8
return results
@classmethod
def get_decrypt_advice(cls, crypto_types: Dict) -> str:
"""生成解密建议"""
advice =
for algo, score in sorted(crypto_types.items(), key=lambda x: -x1):
if algo == 'md5':
advice.append(f"MD5哈希:使用彩虹表碰撞(如hashcat -m 0)或在线解密网站")
elif algo == 'bcrypt':
advice.append(f"Bcrypt哈希:需GPU集群暴力破解(约$1/10亿次)")
elif algo == 'aes':
advice.append(f"AES加密:需获取密钥和IV(检查配置文件或内存dump)")
elif algo == 'custom':
advice.append(f"自定义加密:逆向工程查找加解密函数")
return "\n".join(advice) if advice else "无法识别加密类型"
====== 多数据库连接管理 ======
class DBConnectionManager:
"""多数据库连接处理器"""
def init(self, config_path: str = "db_config.json"):
self.configs = self.load_configs(config_path)
@staticmethod
def load_configs(path: str) -> ListDict:
"""加载多数据库配置"""
with open(path) as f:
configs = json.load(f)
配置解密示例(需根据实际情况修改)
for cfg in configs:
if cfg.get('encrypted'):
key = os.getenv('DECRYPTION_KEY').encode()
cipher = Fernet(key)
cfg'password' = cipher.decrypt(cfg'password'.encode()).decode()
return configs
# 导出总体报告
export_overall_report(writer, analysis_result)
# 通用字段解释
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
export_overall_report(writer, analysis_result)
# 通用字段解释
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构、服务器信息及其作为 Web 服务器的功能和内容"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。"
"\n另外,请分析这些数据是否用于支撑某个 Web 服务器的功能或内容,并说明该 Web 服务器的可能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述及 Web 服务器功能说明},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务器的功能或内容。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构、服务器信息及其作为 Web 服务器的功能和内容"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。"
"\n另外,请分析这些数据是否用于支撑某个 Web 服务器的功能或内容,并说明该 Web 服务器的可能用途(如电商、内容发布、监控等)。"
"\n请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述及 Web 服务器功能说明},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,以及该服务器支持的 Web 服务器的功能或内容。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
import os
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 通用字段解释
common_field_explanations = {
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
explanation_row = [field_explanations.get(col, "") for col in df.columns]
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 通用字段解释
common_field_explanations = {
import os
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
import pymysql
import openai
import json
import pandas as pd
import time
import logging
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# ============ 配置区 ============
DB_CONFIG = {
'host': os.getenv("DB_HOST", "localhost"),
'user': os.getenv("DB_USER", "root"),
'password': os.getenv("DB_PASSWORD", "yourpassword"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4")
}
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "sensitive_data_analysis.xlsx")
MAX_RETRY = int(os.getenv("MAX_RETRY", 3)) # OpenAI API 重试次数
REQUEST_DELAY = int(os.getenv("REQUEST_DELAY", 1)) # 每个请求延迟 1 秒
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
# 导出服务器信息
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 定义每个数据表字段的中文解释(根据实际情况调整)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
sheet_name = '服务器信息'
server_df = pd.DataFrame([server_info])
export_field_explanations(writer, sheet_name, server_df, common_field_explanations)
# 导出日志配置(转换为两列格式)
sheet_name = '日志配置'
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_explanations = {
"配置项": "日志相关的配置项",
"值": "对应配置项的值"
}
export_field_explanations(writer, sheet_name, log_df, log_explanations)
# 导出访问记录
if access_logs:
sheet_name = '访问记录'
access_df = pd.DataFrame(access_logs)
access_field_explanations = {col: "访问记录字段" for col in access_df.columns}
export_field_explanations(writer, sheet_name, access_df, access_field_explanations)
# 导出文件权限
if file_privileges:
sheet_name = '文件权限'
file_df = pd.DataFrame(file_privileges)
file_field_explanations = {col: "文件权限相关字段" for col in file_df.columns}
export_field_explanations(writer, sheet_name, file_df, file_field_explanations)
# 导出 UDF 信息
if udf_info:
sheet_name = 'UDF信息'
udf_df = pd.DataFrame(udf_info)
udf_field_explanations = {col: "UDF 信息字段" for col in udf_df.columns}
export_field_explanations(writer, sheet_name, udf_df, udf_field_explanations)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 定义每个数据表字段的中文解释(根据实际情况调整)
field_explanations = {col: f"{col} 的中文解释" for col in columns}
sheet_name = f"{db}_{table}"[:31]
export_field_explanations(writer, sheet_name, df, field_explanations)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY) # 请求后延迟 1 秒
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
# 定义总体报告字段解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
# field_explanations: dict, 键为字段名,值为中文解释
explanation_row = []
for col in df.columns:
explanation_row.append(field_explanations.get(col, ""))
# 在df上方插入解释行
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 定义通用字段解释(可根据实际情况扩展)
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制,且每个请求间隔 1 秒"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
time.sleep(REQUEST_DELAY) # 请求后延迟 1 秒
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(REQUEST_DELAY)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"请基于下面提供的数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,"
"识别可能的敏感信息和潜在的渗透风险,包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、"
"视频监控流地址、日志配置问题、文件读写权限问题、UDF 提权风险等。字段名可能为中文、拼音或缩写,"
"请结合字段名和样本数据双重判断敏感信息。请用中文输出分析结果,格式如下:\n"
"{\n 'sensitive_fields': {数据库: {表: [敏感字段, ...], ...}, ...},\n"
" 'server_analysis': {服务器相关风险描述},\n"
" 'access_analysis': {访问记录相关风险描述}\n}\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,内容详略得当,并增加每个字段的中文解释
"""
# 定义总体报告字段解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,例如身份证号、手机号、密码等,表明数据中存在可能泄露个人隐私的信息。",
"server_analysis": "对服务器配置及安全设置的分析,包括数据库版本、日志配置、文件权限、UDF 提权风险等,表明服务器安全态势。",
"access_analysis": "对访问记录的分析,展示访问数据库的客户端情况以及可能的异常或潜在风险。"
}
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_field_explanations(writer, sheet_name, df, field_explanations):
"""
在导出的每个工作表中,增加表头下方的字段中文解释行
"""
# field_explanations: dict, 键为字段名,值为中文解释
explanation_row = []
for col in df.columns:
explanation_row.append(field_explanations.get(col, ""))
# 在df上方插入解释行
explanation_df = pd.DataFrame([explanation_row], columns=df.columns)
combined_df = pd.concat([explanation_df, df], ignore_index=True)
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告及每个表字段的中文解释"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 定义通用字段解释(可根据实际情况扩展)
common_field_explanations = {
"版本": "数据库版本号",
"主机名": "数据库所在主机名称",
"端口": "数据库服务端口",
"时区": "服务器时区设置",
"数据目录": "数据库数据存放目录"
}
import pymysql
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
REQUEST_DELAY = 1 # 每个请求延迟 1 秒,防止被滥用阻止
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
REQUEST_DELAY = 1 # 每个请求延迟 1 秒,防止被滥用阻止
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"请用中文输出分析结果,输出格式为:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,每个字段增加中文解释
"""
# 定义字段中文解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,如身份证号、手机号、密码等。",
"server_analysis": "服务器相关分析结果,包括版本、日志配置、文件权限、UDF 提权风险等信息。",
"access_analysis": "访问记录分析结果,展示访问数据库的服务器信息及潜在风险。"
}
# 构造总体报告数据
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
# 将 value 转换为字符串格式,便于展示
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2)
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"请用中文输出分析结果,输出格式为:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_overall_report(writer, analysis_result):
"""
导出总体报告工作表,每个字段增加中文解释
"""
# 定义字段中文解释
explanations = {
"sensitive_fields": "数据库中被识别出的敏感字段,如身份证号、手机号、密码等。",
"server_analysis": "服务器相关分析结果,包括版本、日志配置、文件权限、UDF 提权风险等信息。",
"access_analysis": "访问记录分析结果,展示访问数据库的服务器信息及潜在风险。"
}
# 构造总体报告数据
report_rows = []
for key, explanation in explanations.items():
value = analysis_result.get(key, {})
# 将 value 转换为字符串格式,便于展示
report_rows.append({
"分析项": key,
"结果摘要": json.dumps(value, ensure_ascii=False, indent=2),
"中文解释": explanation
})
report_df = pd.DataFrame(report_rows, columns=["分析项", "结果摘要", "中文解释"])
report_df.to_excel(writer, sheet_name="总体报告", index=False)
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel,包括总体报告"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出总体报告
export_overall_report(writer, analysis_result)
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
import pymysql
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""
枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置、文件权限、UDF 信息
"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
try:
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
db_structure, server_info, access_logs, log_configs, file_privileges, udf_info = collect_db_info(conn)
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
---
以上代码在原有基础上增加了异常处理、日志记录和部分数据格式优化,使脚本在面对较大数据量或异常情况时更加健壮和可维护。如果还有其他需求或改进意见,请随时反馈!
analysis_data = {
"db_structure": db_structure,
"server_info": server_info,
"access_logs": access_logs,
"log_configs": log_configs,
"file_privileges": file_privileges,
"udf_info": udf_info
}
analysis_result = analyze_with_openai(analysis_data)
export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result)
conn.close()
logging.info("整体流程完成!")
if name == "main":
main()
---
以上代码在原有基础上增加了异常处理、日志记录和部分数据格式优化,使脚本在面对较大数据量或异常情况时更加健壮和可维护。如果还有其他需求或改进意见,请随时反馈!
try:
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2) # 等待后重试
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"
sheet_name = sheet_name[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
except Exception as e:
logging.warning(f"获取表 {db}.{table} 样本数据失败: {e}")
samples = []
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析,并增加重试机制"""
openai.api_key = OPENAI_API_KEY
for attempt in range(MAX_RETRY):
try:
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}
],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
logging.warning(f"OpenAI API 调用失败,尝试 {attempt + 1}/{MAX_RETRY},错误:{e}")
time.sleep(2) # 等待后重试
return ""
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
logging.info("正在通过 OpenAI 分析...")
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"字段名可能为中文、拼音或缩写,请结合字段名和样本数据双重判断敏感信息,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
response = call_openai_api(prompt)
try:
analysis_result = json.loads(response)
logging.info("OpenAI 分析完成!")
return analysis_result
except json.JSONDecodeError:
logging.error("OpenAI 响应解析失败,原始响应:" + response)
return {}
# ====== 导出 Excel ======
def export_to_excel(db_structure, server_info, access_logs, log_configs, file_privileges, udf_info, analysis_result):
"""导出数据和分析结果到 Excel"""
logging.info("正在导出数据到 Excel...")
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# 导出服务器信息
pd.DataFrame([server_info]).to_excel(writer, sheet_name='服务器信息', index=False)
# 导出日志配置(调整为两列展示)
log_df = pd.DataFrame(list(log_configs.items()), columns=['配置项', '值'])
log_df.to_excel(writer, sheet_name='日志配置', index=False)
# 导出访问记录
if access_logs:
access_df = pd.DataFrame(access_logs)
access_df.to_excel(writer, sheet_name='访问记录', index=False)
# 导出文件权限
if file_privileges:
file_df = pd.DataFrame(file_privileges)
file_df.to_excel(writer, sheet_name='文件权限', index=False)
# 导出 UDF 信息
if udf_info:
udf_df = pd.DataFrame(udf_info)
udf_df.to_excel(writer, sheet_name='UDF信息', index=False)
# 导出各数据表及分析结果
for db, tables in db_structure.items():
for table, content in tables.items():
data = content['samples']
columns = content['columns']
df = pd.DataFrame(data, columns=columns)
# 标注敏感字段
sensitive_cols = analysis_result.get('sensitive_fields', {}).get(db, {}).get(table, [])
sensitive_marks = ['敏感字段' if col in sensitive_cols else '' for col in columns]
mark_row = pd.DataFrame([sensitive_marks], columns=columns)
df = pd.concat([mark_row, df], ignore_index=True)
# 处理 sheet 名称长度和重复问题
sheet_name = f"{db}_{table}"
sheet_name = sheet_name[:31]
df.to_excel(writer, sheet_name=sheet_name, index=False)
logging.info(f"数据导出完成:{OUTPUT_FILE}")
# ====== 主流程 ======
def main():
conn = connect_db()
if not conn:
return
下面列出一些可以进一步优化脚本的建议,并提供一个改进后的代码版本供参考:
---
优化建议
1. 异常处理和重试机制
对调用 OpenAI API 增加异常捕获、超时处理及重试机制,保证在网络波动时依然能获得结果。
对数据库查询增加更多异常捕获,避免因单个查询失败导致整个流程中断。
2. 上下文管理和资源释放
使用 with 上下文管理器来管理数据库连接和 Excel Writer,确保资源在异常情况下也能正确释放。
3. 数据格式和列名优化
对导出的日志配置、访问记录等数据进行格式整理,添加列标题,使 Excel 文件更具可读性。
考虑对 sheet 名称冲突或长度进行更严格处理。
4. 参数化查询和安全性
对 SQL 语句使用参数化查询(如果涉及动态参数),以降低 SQL 注入风险(尽管此处主要用于采集数据)。
5. 日志记录
增加日志记录模块,将关键步骤和异常写入日志文件,便于后续排查问题。
---
改进后的代码
import pymysql
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
# 获取数据库结构和样本数据
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
---
优化建议
1. 异常处理和重试机制
对调用 OpenAI API 增加异常捕获、超时处理及重试机制,保证在网络波动时依然能获得结果。
对数据库查询增加更多异常捕获,避免因单个查询失败导致整个流程中断。
2. 上下文管理和资源释放
使用 with 上下文管理器来管理数据库连接和 Excel Writer,确保资源在异常情况下也能正确释放。
3. 数据格式和列名优化
对导出的日志配置、访问记录等数据进行格式整理,添加列标题,使 Excel 文件更具可读性。
考虑对 sheet 名称冲突或长度进行更严格处理。
4. 参数化查询和安全性
对 SQL 语句使用参数化查询(如果涉及动态参数),以降低 SQL 注入风险(尽管此处主要用于采集数据)。
5. 日志记录
增加日志记录模块,将关键步骤和异常写入日志文件,便于后续排查问题。
---
改进后的代码
import pymysql
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
# 获取数据库结构和样本数据
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue