下面列出一些可以进一步优化脚本的建议,并提供一个改进后的代码版本供参考:
---
优化建议
1. 异常处理和重试机制
对调用 OpenAI API 增加异常捕获、超时处理及重试机制,保证在网络波动时依然能获得结果。
对数据库查询增加更多异常捕获,避免因单个查询失败导致整个流程中断。
2. 上下文管理和资源释放
使用 with 上下文管理器来管理数据库连接和 Excel Writer,确保资源在异常情况下也能正确释放。
3. 数据格式和列名优化
对导出的日志配置、访问记录等数据进行格式整理,添加列标题,使 Excel 文件更具可读性。
考虑对 sheet 名称冲突或长度进行更严格处理。
4. 参数化查询和安全性
对 SQL 语句使用参数化查询(如果涉及动态参数),以降低 SQL 注入风险(尽管此处主要用于采集数据)。
5. 日志记录
增加日志记录模块,将关键步骤和异常写入日志文件,便于后续排查问题。
---
改进后的代码
import pymysql
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
# 获取数据库结构和样本数据
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue
---
优化建议
1. 异常处理和重试机制
对调用 OpenAI API 增加异常捕获、超时处理及重试机制,保证在网络波动时依然能获得结果。
对数据库查询增加更多异常捕获,避免因单个查询失败导致整个流程中断。
2. 上下文管理和资源释放
使用 with 上下文管理器来管理数据库连接和 Excel Writer,确保资源在异常情况下也能正确释放。
3. 数据格式和列名优化
对导出的日志配置、访问记录等数据进行格式整理,添加列标题,使 Excel 文件更具可读性。
考虑对 sheet 名称冲突或长度进行更严格处理。
4. 参数化查询和安全性
对 SQL 语句使用参数化查询(如果涉及动态参数),以降低 SQL 注入风险(尽管此处主要用于采集数据)。
5. 日志记录
增加日志记录模块,将关键步骤和异常写入日志文件,便于后续排查问题。
---
改进后的代码
import pymysql
import openai
import json
import pandas as pd
import time
import logging
# ============ 配置区 ============
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'yourpassword',
'port': 3306,
'charset': 'utf8mb4'
}
OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
MODEL_NAME = 'gpt-4'
OUTPUT_FILE = 'sensitive_data_analysis.xlsx'
MAX_RETRY = 3 # OpenAI API 重试次数
# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# ====== 数据库连接 ======
def connect_db():
"""建立数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
return None
# ====== 数据采集 ======
def collect_db_info(conn):
"""枚举所有数据库、数据表、字段、样本记录、服务器信息、访问记录、日志配置等"""
logging.info("正在采集数据库信息...")
db_structure = {}
server_info = {}
access_logs = []
log_configs = {}
file_privileges = []
udf_info = []
with conn.cursor() as cursor:
try:
# 获取服务器信息
cursor.execute("SELECT VERSION(), @@hostname, @@port, @@system_time_zone, @@datadir;")
version, hostname, port, timezone, datadir = cursor.fetchone()
server_info = {
'版本': version,
'主机名': hostname,
'端口': port,
'时区': timezone,
'数据目录': datadir
}
except Exception as e:
logging.error(f"采集服务器信息失败: {e}")
try:
# 获取访问记录(需要权限)
cursor.execute("SHOW PROCESSLIST;")
access_logs = cursor.fetchall()
except Exception as e:
logging.warning("当前用户无法查看访问记录 (SHOW PROCESSLIST): " + str(e))
try:
# 日志配置
cursor.execute("SHOW VARIABLES LIKE '%log%'")
log_configs = {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.warning("无法查看日志配置: " + str(e))
try:
# 文件权限(此查询仅为示例,实际环境中可能需要调整)
cursor.execute("SELECT * FROM mysql.db WHERE Db='%' AND (File_priv='Y' OR Process_priv='Y')")
file_privileges = cursor.fetchall()
except Exception as e:
logging.warning("无法查看文件权限: " + str(e))
try:
# UDF 提权信息
cursor.execute("SELECT * FROM mysql.func")
udf_info = cursor.fetchall()
except Exception as e:
logging.warning("无法查看 UDF 信息: " + str(e))
# 获取数据库结构和样本数据
try:
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
except Exception as e:
logging.error("获取数据库列表失败: " + str(e))
databases = []
for db in databases:
if db in ('information_schema', 'performance_schema', 'mysql', 'sys'):
continue # 跳过系统库
try:
cursor.execute(f"USE
{db}")cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
except Exception as e:
logging.warning(f"跳过数据库 {db},原因:{e}")
continue
db_structure[db] = {}
for table in tables:
try:
# 获取字段信息
cursor.execute(f"DESCRIBE
{table}")columns = [col[0] for col in cursor.fetchall()]
except Exception as e:
logging.warning(f"获取表 {db}.{table} 字段信息失败: {e}")
continue