数据库性能是系统的阿克琉斯之踵
再牛的AI代码,遇到慢查询也会拖垮整个系统。大部分开发者写CRUD没问题,但遇到索引优化、N+1问题、连接池配置、分库分表就头大。
这篇文章用MonkeyCode生成SQL优化方案,从实战角度解决最常见的高性能数据库问题。
场景1:索引优化(最常见性能杀手)
问题SQL
-- 查询用户订单,没有索引,全表扫描
SELECT * FROM orders
WHERE user_id = 12345
AND status = 'shipped'
AND created_at > '2025-01-01'
ORDER BY created_at DESC
LIMIT 20;
给MonkeyCode的Prompt
分析以下SQL的性能问题,给出:
1. 缺失的索引及创建语句
2. 索引覆盖分析(covering index)
3. EXPLAIN计划解读
4. 是否需要分区(按created_at)
5. 如果表超过1000万行,给出分表方案
表结构:
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
status VARCHAR(20) NOT NULL,
total DECIMAL(10,2),
created_at DATETIME NOT NULL,
updated_at DATETIME,
INDEX idx_user (user_id) -- 现有索引
);
MonkeyCode生成的优化方案
-- 1. 最优复合索引(覆盖查询的所有WHERE/ORDER BY)
CREATE INDEX idx_orders_user_status_created
ON orders (user_id, status, created_at DESC);
-- 2. 如果经常只查user_id + status(不需要total字段)
-- 建覆盖索引避免回表
CREATE INDEX idx_covering
ON orders (user_id, status, created_at, id)
INCLUDE (total); -- PostgreSQL语法;MySQL用(..., total)
-- 3. 定期检查索引使用率(MySQL 8.0+)
SELECT
TABLE_NAME,
INDEX_NAME,
ROWS_READ,
ROWS_UPDATED
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE TABLE_NAME = 'orders'
AND INDEX_NAME IS NOT NULL;
-- 4. 删除未使用索引(节省写入开销)
-- ALTER TABLE orders DROP INDEX idx_user; -- 确认后再删!
性能对比:
| 方案 | 扫描行数 | 执行时间 |
|---|---|---|
| 无索引 | 800万行 | 12.3秒 |
| idx_user单列 | 3.2万行 | 0.8秒 |
| 复合索引(user_id,status,created_at) | 142行 | 0.003秒 |
配套Python查询代码优化
# optimized_query.py - MonkeyCode生成
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import time
class OrderService:
def __init__(self, db_url: str):
self.engine = create_engine(
db_url,
pool_size=20, # 连接池大小
max_overflow=10, # 超出pool_size后最多创建多少连接
pool_pre_ping=True, # 连接前ping,防止使用断开的连接
pool_recycle=3600, # 连接回收(防止MySQL wait_timeout)
)
self.Session = sessionmaker(bind=self.engine)
def get_user_orders(self, user_id: int, status: str = None, page: int = 1, per_page: int = 20) -> dict:
"""获取用户订单(优化版)"""
offset = (page - 1) * per_page
# 使用参数化查询 + 索引提示
sql = """
SELECT
id, user_id, status, total, created_at
FROM orders USE INDEX (idx_orders_user_status_created)
WHERE user_id = :user_id
AND (:status IS NULL OR status = :status)
AND created_at > :since
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
"""
since = '2025-01-01' # 只查最近数据
with self.engine.connect() as conn:
result = conn.execute(text(sql), {
'user_id': user_id,
'status': status,
'since': since,
'limit': per_page,
'offset': offset
})
orders = [dict(row) for row in result.mappings()]
# 用同一连接查总数(避免两次连接开销)
count_sql = """
SELECT COUNT(*) as total
FROM orders
WHERE user_id = :user_id
AND (:status IS NULL OR status = :status)
AND created_at > :since
"""
total = conn.execute(text(count_sql), {
'user_id': user_id,
'status': status,
'since': since
}).scalar()
return {
'orders': orders,
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'total_pages': (total + per_page - 1) // per_page
}
}
def get_orders_by_ids(self, order_ids: list) -> list:
"""批量查询(避免N+1)"""
if not order_ids:
return []
# 用IN查询,确保使用索引
placeholders = ','.join([':id' + str(i) for i in range(len(order_ids))])
params = {f'id{i}': oid for i, oid in enumerate(order_ids)}
sql = f"""
SELECT * FROM orders
WHERE id IN ({placeholders})
ORDER BY FIELD(id, {placeholders})
"""
# 注意:MySQL的FIELD函数保持传入顺序
with self.engine.connect() as conn:
result = conn.execute(text(sql), params)
return [dict(row) for row in result.mappings()]
# 使用
service = OrderService('mysql+pymysql://user:pass@localhost:3306/mydb')
result = service.get_user_orders(user_id=12345, status='shipped', page=1)
print(f"查询到 {len(result['orders'])} 条订单,共 {result['pagination']['total']} 条")
场景2:N+1问题(ORM性能杀手)
问题代码
# N+1问题示例
users = session.query(User).filter(User.active == True).all()
for user in users: # N次查询
orders = session.query(Order).filter(Order.user_id == user.id).all()
print(f"{user.name}: {len(orders)} orders")
MonkeyCode生成的解决方案
# optimize_nplus1.py - MonkeyCode生成
from sqlalchemy.orm import joinedload, selectinload, contains_eager
from sqlalchemy import func
class UserService:
def get_users_with_orders(self, page: int = 1, per_page: int = 20):
"""解决N+1:用JOIN预加载"""
# 方案1:joinedload(适合一对一,小结果集)
users = (
session.query(User)
.options(joinedload(User.orders)) # 一条SQL搞定
.filter(User.active == True)
.offset((page-1) * per_page)
.limit(per_page)
.all()
)
# 方案2:selectinload(适合一对多,大结果集)
# 会执行2条SQL,但比N+1好
users = (
session.query(User)
.options(selectinload(User.orders))
.filter(User.active == True)
.all()
)
return users
def get_users_with_order_stats(self):
"""用子查询聚合,避免JOIN后的笛卡尔积"""
from sqlalchemy import subquery, select
# 子查询:每个用户的订单统计
order_stats = (
select([
Order.user_id,
func.count(Order.id).label('order_count'),
func.sum(Order.total).label('total_spent')
])
.group_by(Order.user_id)
.alias('order_stats')
)
# JOIN子查询
users = (
session.query(User, order_stats.c.order_count, order_stats.c.total_spent)
.outerjoin(order_stats, User.id == order_stats.c.user_id)
.filter(User.active == True)
.all()
)
return users
# 性能对比
def benchmark_nplus1():
import time
# 坏方法:N+1
start = time.time()
users = session.query(User).limit(100).all()
for u in users:
_ = len(u.orders) # 触发100次查询
bad_time = time.time() - start
# 好方法:joinedload
start = time.time()
users = session.query(User).options(joinedload(User.orders)).limit(100).all()
for u in users:
_ = len(u.orders) # 不触发新查询
good_time = time.time() - start
print(f"N+1: {bad_time:.3f}s")
print(f"JOIN preload: {good_time:.3f}s")
print(f"Speedup: {bad_time/good_time:.1f}x")
场景3:连接池配置(高并发必调)
MonkeyCode生成的连接池监控
# connection_pool_monitor.py - MonkeyCode生成
from sqlalchemy import event
from sqlalchemy.pool import Pool
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 监听连接池事件
@event.listens_for(Pool, "checkout")
def on_checkout(dbapi_conn, connection_rec, connection_proxy):
logger.info(f"Checkout connection {id(dbapi_conn)}")
@event.listens_for(Pool, "checkin")
def on_checkin(dbapi_conn, connection_rec):
logger.info(f"Checkin connection {id(dbapi_conn)}")
@event.listens_for(Pool, "connect")
def on_connect(dbapi_conn, connection_rec):
logger.info(f"New connection created: {id(dbapi_conn)}")
class PoolMonitor:
"""监控连接池状态"""
def __init__(self, engine):
self.engine = engine
self.checkout_count = 0
self.checkin_count = 0
event.listen(engine.pool, "checkout", self._on_checkout)
event.listen(engine.pool, "checkin", self._on_checkin)
def _on_checkout(self, dbapi_conn, connection_rec, connection_proxy):
self.checkout_count += 1
def _on_checkin(self, dbapi_conn, connection_rec):
self.checkin_count += 1
def status(self) -> dict:
pool = self.engine.pool
return {
'checked_out': self.checkout_count - self.checkin_count,
'pool_size': pool.size(),
'connections_in_use': pool.checked_out(),
'overflow': pool.overflow(),
'total_checkouts': self.checkout_count,
}
def print_status(self):
status = self.status()
print(f"连接池状态:")
print(f" 使用中: {status['connections_in_use']}")
print(f" 池大小: {status['pool_size']}")
print(f" 溢出: {status['overflow']}")
print(f" 总检出次数: {status['total_checkouts']}")
# 使用
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://user:pass@localhost/mydb',
pool_size=10, max_overflow=20)
monitor = PoolMonitor(engine)
# 运行一段时间后查看
import time; time.sleep(60)
monitor.print_status()
连接池配置公式(经验值):
pool_size = CPU核心数 * 2
max_overflow = pool_size * 2
pool_recycle = 3600 # MySQL默认wait_timeout=8小时
场景4:分库分表(千万级数据)
分表策略(按user_id哈希)
# sharding.py - MonkeyCode生成
import hashlib
from sqlalchemy import create_engine, Table, MetaData, select
from sqlalchemy.orm import sessionmaker
class OrderSharding:
"""订单分表(按user_id哈希分16张表)"""
def __init__(self, db_urls: list):
self.num_shards = 16
self.engines = [
create_engine(url, pool_size=5, max_overflow=10)
for url in db_urls
]
self.sessions = [sessionmaker(bind=engine)() for engine in self.engines]
def _get_shard_index(self, user_id: int) -> int:
"""计算分片索引"""
return user_id % self.num_shards
def _get_table_name(self, user_id: int) -> str:
"""获取分表名"""
shard = self._get_shard_index(user_id)
return f'orders_{shard:02d}' # orders_00, orders_01, ...
def create_order(self, user_id: int, total: float) -> dict:
"""创建订单(写入对应分表)"""
table_name = self._get_table_name(user_id)
shard_idx = self._get_shard_index(user_id)
session = self.sessions[shard_idx]
# 动态创建表对象
metadata = MetaData()
order_table = Table(table_name, metadata, autoload_with=self.engines[shard_idx])
# 插入
stmt = order_table.insert().values(
user_id=user_id,
total=total,
status='pending',
created_at=func.now()
)
result = session.execute(stmt)
session.commit()
return {'order_id': result.inserted_primary_key[0], 'shard': shard_idx}
def get_user_orders(self, user_id: int, page: int = 1, per_page: int = 20):
"""查询用户订单(只查对应分表)"""
table_name = self._get_table_name(user_id)
shard_idx = self._get_shard_index(user_id)
session = self.sessions[shard_idx]
metadata = MetaData()
order_table = Table(table_name, metadata, autoload_with=self.engines[shard_idx])
query = (
select(order_table)
.where(order_table.c.user_id == user_id)
.order_by(order_table.c.created_at.desc())
.limit(per_page)
.offset((page-1) * per_page)
)
result = session.execute(query)
return [dict(row) for row in result.mappings()]
# 使用
sharding = OrderSharding([
'mysql://user:pass@shard1:3306/mydb',
'mysql://user:pass@shard2:3306/mydb',
# ... 16个分片
])
order = sharding.create_order(user_id=12345, total=99.99)
print(f"订单创建在分片 {order['shard']}")
场景5:读写分离(高可用架构)
# read_write_splitting.py - MonkeyCode生成
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import random
class ReadWriteSplitting:
"""读写分离:写主库,读从库"""
def __init__(self, master_url: str, slave_urls: list):
self.master_engine = create_engine(master_url, pool_size=20)
self.slave_engines = [
create_engine(url, pool_size=10)
for url in slave_urls
]
self.MasterSession = sessionmaker(bind=self.master_engine)
self.slave_sessions = [
sessionmaker(bind=engine)()
for engine in self.slave_engines
]
def get_read_session(self):
"""随机选择一个从库(简单的负载均衡)"""
return random.choice(self.slave_sessions)
def get_write_session(self):
"""返回主库session"""
return self.MasterSession()
def transaction(self, func):
"""事务:强制用主库"""
session = self.get_write_session()
try:
result = func(session)
session.commit()
return result
except:
session.rollback()
raise
finally:
session.close()
# 使用
db = ReadWriteSplitting(
master_url='mysql://user:pass@master:3306/mydb',
slave_urls=[
'mysql://user:pass@slave1:3306/mydb',
'mysql://user:pass@slave2:3306/mydb',
]
)
# 写操作
db.transaction(lambda s: s.execute('INSERT INTO orders ...'))
# 读操作
read_session = db.get_read_session()
users = read_session.query(User).all()
性能优化检查清单
用MonkeyCode生成数据库优化方案时,把这个清单给它:
## 数据库性能检查清单
- [ ] 所有WHERE条件字段都有索引
- [ ] 复合索引顺序符合最左前缀原则
- [ ] 没有SELECT *,只查需要的列
- [ ] 分页用LIMIT+OFSSET,大偏移量用游标分页
- [ ] 没有N+1问题(用EXPLAIN确认)
- [ ] 连接池配置合理(pool_size, max_overflow)
- [ ] 长查询加了超时(socket_timeout)
- [ ] 定期ANALYZE TABLE更新统计信息
- [ ] 大表有归档策略(冷数据迁移)
- [ ] 读写分离已配置(如果QPS > 5000)
MonkeyCode在数据库优化上的价值:它知道索引的最佳实践,能生成EXPLAIN解读,还能给出分表分库的具体代码。但索引是否能覆盖所有查询场景,必须人工验证。
文章摘自:https://www.cnblogs.com/jaryn/p/20218926
