一、环境准备与驱动安装
1. 选择合适的Python驱动
OceanBase支持MySQL协议,推荐使用以下两种主流驱动:
- PyMySQL(Python 3.x):纯Python实现,无需编译依赖
pip install pymysql
- mysqlclient(Python 2.x/3.x):基于MySQL C API,性能更优但需编译环境
# Linux安装
sudo apt-get install python3-dev libmysqlclient-dev
pip install mysqlclient
# Windows安装(预编译包)
pip install https://download.lfd.uci.edu/pythonlibs/.../mysqlclient-1.4.6-cp39-cp39-win_amd64.whl
2. 验证安装
import pymysql
print(pymysql.__version__) # 应显示版本号
二、核心连接配置
1. 连接参数说明
参数名 | 描述 | 示例值 |
host | 数据库地址(支持负载均衡) | 'obcluster.example.com' |
port | 连接端口(MySQL模式默认3306) | 3306 |
user | 租户账号(格式:user@tenant) | 'admin@mysql_tenant' |
password | 账号密码 | 'secure_password' |
database | 目标数据库名称 | 'test_db' |
charset | 字符集 | 'utf8mb4' |
connect_timeout | 连接超时时间(秒) | 10 |
2. 基础连接示例(PyMySQL)
import pymysql
config = {
'host': 'obcluster.example.com',
'port': 3306,
'user': 'admin@mysql_tenant',
'password': 'secure_password',
'database': 'test_db',
'charset': 'utf8mb4',
'connect_timeout': 10
}
try:
conn = pymysql.connect(**config)
print("Connected successfully")
except pymysql.MySQLError as e:
print(f"Connection failed: {e}")
finally:
if 'conn' in locals():
conn.close()
三、数据操作与事务管理
1. 执行查询
with conn.cursor() as cursor:
sql = "SELECT * FROM employees WHERE department = %s"
cursor.execute(sql, ('IT',))
result = cursor.fetchall()
for row in result:
print(row)
2. 插入数据
with conn.cursor() as cursor:
sql = """
INSERT INTO employees (name, age, department)
VALUES (%s, %s, %s)
"""
data = [
('Alice', 30, 'HR'),
('Bob', 28, 'Finance')
]
cursor.executemany(sql, data)
conn.commit()
3. 事务处理
try:
with conn.cursor() as cursor:
# 开启事务
conn.begin()
# 执行操作1
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
# 执行操作2
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# 提交事务
conn.commit()
except Exception as e:
# 回滚事务
conn.rollback()
print(f"Transaction failed: {e}")
四、高级功能与优化
1. 参数化查询(防止SQL注入)
# 错误示例(直接拼接字符串)
username = "admin'; DROP TABLE users;--"
sql = f"SELECT * FROM users WHERE username = '{username}'"
# 正确示例(使用参数化)
sql = "SELECT * FROM users WHERE username = %s"
cursor.execute(sql, (username,))
2. 连接池配置(提升性能)
from sqlalchemy import create_engine
engine = create_engine(
'mysql+pymysql://admin@mysql_tenant:secure_password@obcluster.example.com:3306/test_db',
pool_size=10,
max_overflow=20,
pool_recycle=3600
)
with engine.connect() as connection:
result = connection.execute("SELECT * FROM employees")
for row in result:
print(row)
3. SSL加密连接
# 下载证书后配置
ssl_config = {
'ca': '/path/to/ca.pem',
'cert': '/path/to/client-cert.pem',
'key': '/path/to/client-key.pem'
}
conn = pymysql.connect(ssl=ssl_config, **config)
五、异常处理与最佳实践
1. 常见异常类型
异常类 | 描述 | 处理建议 |
OperationalError | 连接/执行错误 | 检查网络及权限 |
ProgrammingError | SQL语法错误 | 验证SQL语句正确性 |
IntegrityError | 数据完整性约束冲突 | 检查外键/唯一性约束 |
2. 错误处理模板
try:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM non_existent_table")
except pymysql.ProgrammingError as e:
print(f"SQL error: {e}")
except pymysql.OperationalError as e:
print(f"Connection error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
六、性能优化建议
- 批量操作:使用executemany()代替多次execute()
# 低效方式
for data in large_data_set:
cursor.execute("INSERT INTO table VALUES (%s)", data)
# 高效方式
cursor.executemany("INSERT INTO table VALUES (%s)", large_data_set)
- 结果流式处理:使用SSCursor处理大数据量
from pymysql.cursors import SSCursor
with conn.cursor(SSCursor) as cursor:
cursor.execute("SELECT * FROM large_table")
for row in cursor:
process_row(row)
- 连接池调优:根据并发量调整pool_size和max_overflow
# 高并发场景配置
engine = create_engine(..., pool_size=20, max_overflow=50)
七、运维与监控
- 连接状态监控
print(f"Current connections: {engine.pool.status()}")
- 慢查询分析
# 开启慢查询日志
conn.execute("SET SESSION long_query_time = 1")
conn.execute("SET SESSION log_slow_queries = 1")
- 性能分析工具
# 使用pt-query-digest分析慢查询日志
pt-query-digest slow.log > slow_analysis.txt
八、安全最佳实践
- 权限最小化:为应用账户授予最小权限
GRANT SELECT, INSERT, UPDATE ON test_db.* TO 'app_user'@'%';
- 密码管理:使用环境变量或配置文件存储密码
import os
password = os.environ.get('OB_PASSWORD')
- 审计日志:开启审计功能记录操作
ALTER SYSTEM SET audit_log_level = 1;
九、常见问题排查
问题现象 | 可能原因 | 解决方案 |
连接超时 | 网络问题/端口错误 | 检查防火墙及端口配置 |
权限不足 | 用户权限不足 | 确认GRANT语句正确性 |
数据乱码 | 字符集不匹配 | 设置正确的charset参数 |
连接泄漏 | 未正确关闭连接 | 使用with上下文管理器 |
通过以上步骤,您可以在Python应用中安全、高效地访问OceanBase数据库。根据具体业务场景,可灵活调整连接参数、事务策略和性能优化措施。