已经优化完毕!下面是新增了对日志配置、文件权限、UDF 提权等渗透途径分析的代码:
import pymysql
import openai
import json
import pandas as pd
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
print("✅ 数据库连接成功")
return conn
except Exception as e:
print(f"❌ 数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
print("🔍 正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
# 获取访问记录(需要权限)
try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except:
print("⚠️ 当前用户无法查看访问记录 (SHOW PROCESSLIST)")
# 日志配置
try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except:
print("⚠️ 无法查看日志配置")
# 文件权限
try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except:
print("⚠️ 无法查看文件权限")
# UDF 提权信息
try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except:
print("⚠️ 无法查看 UDF 信息")
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
db_structure[db] = {}
for table in tables:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
samples = cursor.fetchall()
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析"""
openai.api_key = OPENAI_API_KEY
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
print("🧠 正在通过 OpenAI 分析...")
# 生成分析 prompt
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)
import pymysql
import openai
import json
import pandas as pd
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
print("✅ 数据库连接成功")
return conn
except Exception as e:
print(f"❌ 数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
print("🔍 正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
# 获取访问记录(需要权限)
try:
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except:
print("⚠️ 当前用户无法查看访问记录 (SHOW PROCESSLIST)")
# 日志配置
try:
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except:
print("⚠️ 无法查看日志配置")
# 文件权限
try:
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except:
print("⚠️ 无法查看文件权限")
# UDF 提权信息
try:
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except:
print("⚠️ 无法查看 UDF 信息")
# 获取数据库结构和样本数据
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
db_structure[db] = {}
for table in tables:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
# 获取前 5 条数据样本
cursor.execute(f"SELECT * FROM
{table} LIMIT 5")samples = cursor.fetchall()
db_structure[db][table] = {
"columns": columns,
"samples": samples
}
return db_structure, server_info, access_logs, log_configs, file_privileges, udf_info
# ====== OpenAI 分析 ======
def call_openai_api(prompt):
"""调用 OpenAI API 进行分析"""
openai.api_key = OPENAI_API_KEY
response = openai.ChatCompletion.create(
model=MODEL_NAME,
messages=[{"role": "system", "content": "你是一个数据库安全分析助手。"},
{"role": "user", "content": prompt}],
max_tokens=2000
)
return response['choices'][0]['message']['content'].strip()
def analyze_with_openai(data):
"""利用 OpenAI 分析数据库结构和渗透风险"""
print("🧠 正在通过 OpenAI 分析...")
# 生成分析 prompt
prompt = (
"以下是数据库结构、服务器信息、访问记录、日志配置、文件权限、UDF 信息,请识别可能的敏感信息和潜在的渗透风险,"
"包括但不限于:身份证号、手机号、邮箱、密码、IP 地址、端口、视频监控流地址、日志配置、文件读写权限、UDF 提权风险等,"
"输出格式:{'sensitive_fields': {...}, 'server_analysis': {...}, 'access_analysis': {...}}。\n\n"
f"数据如下:\n{json.dumps(data, ensure_ascii=False, indent=2)}"
)