MonkeyCode数据库优化实战:从慢查询到百万QPS

数据库性能是系统的阿克琉斯之踵

再牛的AI代码,遇到慢查询也会拖垮整个系统。大部分开发者写CRUD没问题,但遇到索引优化、N+1问题、连接池配置、分库分表就头大。

这篇文章用MonkeyCode生成SQL优化方案,从实战角度解决最常见的高性能数据库问题。

场景1:索引优化(最常见性能杀手)

问题SQL

-- 查询用户订单,没有索引,全表扫描
SELECT * FROM orders 
WHERE user_id = 12345 
  AND status = 'shipped' 
  AND created_at > '2025-01-01'
ORDER BY created_at DESC 
LIMIT 20;

给MonkeyCode的Prompt

分析以下SQL的性能问题,给出:
1. 缺失的索引及创建语句
2. 索引覆盖分析(covering index)
3. EXPLAIN计划解读
4. 是否需要分区(按created_at)
5. 如果表超过1000万行,给出分表方案

表结构:
CREATE TABLE orders (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id BIGINT NOT NULL,
  status VARCHAR(20) NOT NULL,
  total DECIMAL(10,2),
  created_at DATETIME NOT NULL,
  updated_at DATETIME,
  INDEX idx_user (user_id)  -- 现有索引
);

MonkeyCode生成的优化方案

-- 1. 最优复合索引(覆盖查询的所有WHERE/ORDER BY)
CREATE INDEX idx_orders_user_status_created 
ON orders (user_id, status, created_at DESC);

-- 2. 如果经常只查user_id + status(不需要total字段)
-- 建覆盖索引避免回表
CREATE INDEX idx_covering 
ON orders (user_id, status, created_at, id) 
INCLUDE (total);  -- PostgreSQL语法;MySQL用(..., total)

-- 3. 定期检查索引使用率(MySQL 8.0+)
SELECT 
  TABLE_NAME,
  INDEX_NAME,
  ROWS_READ,
  ROWS_UPDATED
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE TABLE_NAME = 'orders'
  AND INDEX_NAME IS NOT NULL;

-- 4. 删除未使用索引(节省写入开销)
-- ALTER TABLE orders DROP INDEX idx_user;  -- 确认后再删!

性能对比

方案 扫描行数 执行时间
无索引 800万行 12.3秒
idx_user单列 3.2万行 0.8秒
复合索引(user_id,status,created_at) 142行 0.003秒

配套Python查询代码优化

# optimized_query.py - MonkeyCode生成
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import time

class OrderService:
    def __init__(self, db_url: str):
        self.engine = create_engine(
            db_url,
            pool_size=20,           # 连接池大小
            max_overflow=10,        # 超出pool_size后最多创建多少连接
            pool_pre_ping=True,     # 连接前ping,防止使用断开的连接
            pool_recycle=3600,      # 连接回收(防止MySQL wait_timeout)
        )
        self.Session = sessionmaker(bind=self.engine)
    
    def get_user_orders(self, user_id: int, status: str = None, page: int = 1, per_page: int = 20) -> dict:
        """获取用户订单(优化版)"""
        offset = (page - 1) * per_page
        
        # 使用参数化查询 + 索引提示
        sql = """
            SELECT 
                id, user_id, status, total, created_at
            FROM orders USE INDEX (idx_orders_user_status_created)
            WHERE user_id = :user_id
              AND (:status IS NULL OR status = :status)
              AND created_at > :since
            ORDER BY created_at DESC
            LIMIT :limit OFFSET :offset
        """
        
        since = '2025-01-01'  # 只查最近数据
        
        with self.engine.connect() as conn:
            result = conn.execute(text(sql), {
                'user_id': user_id,
                'status': status,
                'since': since,
                'limit': per_page,
                'offset': offset
            })
            
            orders = [dict(row) for row in result.mappings()]
            
            # 用同一连接查总数(避免两次连接开销)
            count_sql = """
                SELECT COUNT(*) as total 
                FROM orders 
                WHERE user_id = :user_id
                  AND (:status IS NULL OR status = :status)
                  AND created_at > :since
            """
            total = conn.execute(text(count_sql), {
                'user_id': user_id,
                'status': status,
                'since': since
            }).scalar()
        
        return {
            'orders': orders,
            'pagination': {
                'page': page,
                'per_page': per_page,
                'total': total,
                'total_pages': (total + per_page - 1) // per_page
            }
        }
    
    def get_orders_by_ids(self, order_ids: list) -> list:
        """批量查询(避免N+1)"""
        if not order_ids:
            return []
        
        # 用IN查询,确保使用索引
        placeholders = ','.join([':id' + str(i) for i in range(len(order_ids))])
        params = {f'id{i}': oid for i, oid in enumerate(order_ids)}
        
        sql = f"""
            SELECT * FROM orders 
            WHERE id IN ({placeholders})
            ORDER BY FIELD(id, {placeholders})
        """
        # 注意:MySQL的FIELD函数保持传入顺序
        
        with self.engine.connect() as conn:
            result = conn.execute(text(sql), params)
            return [dict(row) for row in result.mappings()]

# 使用
service = OrderService('mysql+pymysql://user:pass@localhost:3306/mydb')
result = service.get_user_orders(user_id=12345, status='shipped', page=1)
print(f"查询到 {len(result['orders'])} 条订单,共 {result['pagination']['total']} 条")

场景2:N+1问题(ORM性能杀手)

问题代码

# N+1问题示例
users = session.query(User).filter(User.active == True).all()
for user in users:  # N次查询
    orders = session.query(Order).filter(Order.user_id == user.id).all()
    print(f"{user.name}: {len(orders)} orders")

MonkeyCode生成的解决方案

# optimize_nplus1.py - MonkeyCode生成
from sqlalchemy.orm import joinedload, selectinload, contains_eager
from sqlalchemy import func

class UserService:
    def get_users_with_orders(self, page: int = 1, per_page: int = 20):
        """解决N+1:用JOIN预加载"""
        
        # 方案1:joinedload(适合一对一,小结果集)
        users = (
            session.query(User)
            .options(joinedload(User.orders))  # 一条SQL搞定
            .filter(User.active == True)
            .offset((page-1) * per_page)
            .limit(per_page)
            .all()
        )
        
        # 方案2:selectinload(适合一对多,大结果集)
        # 会执行2条SQL,但比N+1好
        users = (
            session.query(User)
            .options(selectinload(User.orders))
            .filter(User.active == True)
            .all()
        )
        
        return users
    
    def get_users_with_order_stats(self):
        """用子查询聚合,避免JOIN后的笛卡尔积"""
        from sqlalchemy import subquery, select
        
        # 子查询:每个用户的订单统计
        order_stats = (
            select([
                Order.user_id,
                func.count(Order.id).label('order_count'),
                func.sum(Order.total).label('total_spent')
            ])
            .group_by(Order.user_id)
            .alias('order_stats')
        )
        
        # JOIN子查询
        users = (
            session.query(User, order_stats.c.order_count, order_stats.c.total_spent)
            .outerjoin(order_stats, User.id == order_stats.c.user_id)
            .filter(User.active == True)
            .all()
        )
        
        return users

# 性能对比
def benchmark_nplus1():
    import time
    
    # 坏方法:N+1
    start = time.time()
    users = session.query(User).limit(100).all()
    for u in users:
        _ = len(u.orders)  # 触发100次查询
    bad_time = time.time() - start
    
    # 好方法:joinedload
    start = time.time()
    users = session.query(User).options(joinedload(User.orders)).limit(100).all()
    for u in users:
        _ = len(u.orders)  # 不触发新查询
    good_time = time.time() - start
    
    print(f"N+1: {bad_time:.3f}s")
    print(f"JOIN preload: {good_time:.3f}s")
    print(f"Speedup: {bad_time/good_time:.1f}x")

场景3:连接池配置(高并发必调)

MonkeyCode生成的连接池监控

# connection_pool_monitor.py - MonkeyCode生成
from sqlalchemy import event
from sqlalchemy.pool import Pool
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 监听连接池事件
@event.listens_for(Pool, "checkout")
def on_checkout(dbapi_conn, connection_rec, connection_proxy):
    logger.info(f"Checkout connection {id(dbapi_conn)}")

@event.listens_for(Pool, "checkin")
def on_checkin(dbapi_conn, connection_rec):
    logger.info(f"Checkin connection {id(dbapi_conn)}")

@event.listens_for(Pool, "connect")
def on_connect(dbapi_conn, connection_rec):
    logger.info(f"New connection created: {id(dbapi_conn)}")

class PoolMonitor:
    """监控连接池状态"""
    def __init__(self, engine):
        self.engine = engine
        self.checkout_count = 0
        self.checkin_count = 0
        
        event.listen(engine.pool, "checkout", self._on_checkout)
        event.listen(engine.pool, "checkin", self._on_checkin)
    
    def _on_checkout(self, dbapi_conn, connection_rec, connection_proxy):
        self.checkout_count += 1
    
    def _on_checkin(self, dbapi_conn, connection_rec):
        self.checkin_count += 1
    
    def status(self) -> dict:
        pool = self.engine.pool
        return {
            'checked_out': self.checkout_count - self.checkin_count,
            'pool_size': pool.size(),
            'connections_in_use': pool.checked_out(),
            'overflow': pool.overflow(),
            'total_checkouts': self.checkout_count,
        }
    
    def print_status(self):
        status = self.status()
        print(f"连接池状态:")
        print(f"  使用中: {status['connections_in_use']}")
        print(f"  池大小: {status['pool_size']}")
        print(f"  溢出: {status['overflow']}")
        print(f"  总检出次数: {status['total_checkouts']}")

# 使用
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://user:pass@localhost/mydb', 
                      pool_size=10, max_overflow=20)
monitor = PoolMonitor(engine)

# 运行一段时间后查看
import time; time.sleep(60)
monitor.print_status()

连接池配置公式(经验值):

pool_size = CPU核心数 * 2
max_overflow = pool_size * 2
pool_recycle = 3600  # MySQL默认wait_timeout=8小时

场景4:分库分表(千万级数据)

分表策略(按user_id哈希)

# sharding.py - MonkeyCode生成
import hashlib
from sqlalchemy import create_engine, Table, MetaData, select
from sqlalchemy.orm import sessionmaker

class OrderSharding:
    """订单分表(按user_id哈希分16张表)"""
    def __init__(self, db_urls: list):
        self.num_shards = 16
        self.engines = [
            create_engine(url, pool_size=5, max_overflow=10)
            for url in db_urls
        ]
        self.sessions = [sessionmaker(bind=engine)() for engine in self.engines]
    
    def _get_shard_index(self, user_id: int) -> int:
        """计算分片索引"""
        return user_id % self.num_shards
    
    def _get_table_name(self, user_id: int) -> str:
        """获取分表名"""
        shard = self._get_shard_index(user_id)
        return f'orders_{shard:02d}'  # orders_00, orders_01, ...
    
    def create_order(self, user_id: int, total: float) -> dict:
        """创建订单(写入对应分表)"""
        table_name = self._get_table_name(user_id)
        shard_idx = self._get_shard_index(user_id)
        session = self.sessions[shard_idx]
        
        # 动态创建表对象
        metadata = MetaData()
        order_table = Table(table_name, metadata, autoload_with=self.engines[shard_idx])
        
        # 插入
        stmt = order_table.insert().values(
            user_id=user_id,
            total=total,
            status='pending',
            created_at=func.now()
        )
        result = session.execute(stmt)
        session.commit()
        
        return {'order_id': result.inserted_primary_key[0], 'shard': shard_idx}
    
    def get_user_orders(self, user_id: int, page: int = 1, per_page: int = 20):
        """查询用户订单(只查对应分表)"""
        table_name = self._get_table_name(user_id)
        shard_idx = self._get_shard_index(user_id)
        session = self.sessions[shard_idx]
        
        metadata = MetaData()
        order_table = Table(table_name, metadata, autoload_with=self.engines[shard_idx])
        
        query = (
            select(order_table)
            .where(order_table.c.user_id == user_id)
            .order_by(order_table.c.created_at.desc())
            .limit(per_page)
            .offset((page-1) * per_page)
        )
        
        result = session.execute(query)
        return [dict(row) for row in result.mappings()]

# 使用
sharding = OrderSharding([
    'mysql://user:pass@shard1:3306/mydb',
    'mysql://user:pass@shard2:3306/mydb',
    # ... 16个分片
])

order = sharding.create_order(user_id=12345, total=99.99)
print(f"订单创建在分片 {order['shard']}")

场景5:读写分离(高可用架构)

# read_write_splitting.py - MonkeyCode生成
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import random

class ReadWriteSplitting:
    """读写分离:写主库,读从库"""
    def __init__(self, master_url: str, slave_urls: list):
        self.master_engine = create_engine(master_url, pool_size=20)
        self.slave_engines = [
            create_engine(url, pool_size=10)
            for url in slave_urls
        ]
        
        self.MasterSession = sessionmaker(bind=self.master_engine)
        self.slave_sessions = [
            sessionmaker(bind=engine)()
            for engine in self.slave_engines
        ]
    
    def get_read_session(self):
        """随机选择一个从库(简单的负载均衡)"""
        return random.choice(self.slave_sessions)
    
    def get_write_session(self):
        """返回主库session"""
        return self.MasterSession()
    
    def transaction(self, func):
        """事务:强制用主库"""
        session = self.get_write_session()
        try:
            result = func(session)
            session.commit()
            return result
        except:
            session.rollback()
            raise
        finally:
            session.close()

# 使用
db = ReadWriteSplitting(
    master_url='mysql://user:pass@master:3306/mydb',
    slave_urls=[
        'mysql://user:pass@slave1:3306/mydb',
        'mysql://user:pass@slave2:3306/mydb',
    ]
)

# 写操作
db.transaction(lambda s: s.execute('INSERT INTO orders ...'))

# 读操作
read_session = db.get_read_session()
users = read_session.query(User).all()

性能优化检查清单

用MonkeyCode生成数据库优化方案时,把这个清单给它:

## 数据库性能检查清单
- [ ] 所有WHERE条件字段都有索引
- [ ] 复合索引顺序符合最左前缀原则
- [ ] 没有SELECT *,只查需要的列
- [ ] 分页用LIMIT+OFSSET,大偏移量用游标分页
- [ ] 没有N+1问题(用EXPLAIN确认)
- [ ] 连接池配置合理(pool_size, max_overflow)
- [ ] 长查询加了超时(socket_timeout)
- [ ] 定期ANALYZE TABLE更新统计信息
- [ ] 大表有归档策略(冷数据迁移)
- [ ] 读写分离已配置(如果QPS > 5000)

MonkeyCode在数据库优化上的价值:它知道索引的最佳实践,能生成EXPLAIN解读,还能给出分表分库的具体代码。但索引是否能覆盖所有查询场景,必须人工验证。

文章摘自:https://www.cnblogs.com/jaryn/p/20218926