MonkeyCode写消息队列:从零实现高可用异步架构

同步调用是微服务耦合的根源。消息队列解耦了服务,也救了你的响应时间。用MonkeyCode,从选型到实现一把梭。

为什么需要消息队列?

一个典型的电商下单流程,同步调用长这样:

# 同步调用:创建订单要等所有下游返回
@app.post("/orders")
async def create_order(req: CreateOrderRequest):
    user = await get_user(req.user_id)          # 50ms
    payment = await process_payment(req)          # 200ms
    inventory = await deduct_inventory(req)      # 100ms
    notify = await send_notification(req)         # 80ms
    log = await write_order_log(req)             # 30ms
    return {"status": "ok"}  # 总共 460ms

用户点击”下单”后,要等460ms才能看到结果。而且任何一个服务挂了,下单就失败了。

引入消息队列后:

# 异步处理:下单只管写队列,20ms内返回
@app.post("/orders")
async def create_order(req: CreateOrderRequest):
    order_id = await save_order(req)  # 写数据库 20ms
    await publish_event("order.created", {   # 发消息 5ms
        "order_id": order_id,
        "user_id": req.user_id,
        "amount": req.amount
    })
    return {"order_id": order_id, "status": "processing"}  # 25ms

下游的支付、扣库存、发通知,全部异步处理。用户体验快了18倍。

主流消息队列选型

MonkeyCode帮你选:

方案 适用场景 吞吐量 延迟
RabbitMQ 业务消息、延迟队列 中等(万级/秒) 毫秒级
Redis Streams 轻量场景、已有Redis 高(十万级/秒) 毫秒级
Kafka 日志流、大数据管道 极高(百万级/秒) 毫秒~秒级
Amazon SQS 云原生、无运维 秒级

实战一:RabbitMQ + FastAPI

安装配置

让MonkeyCode生成Docker配置:

为RabbitMQ生成docker-compose配置,包含管理界面,默认用户密码admin/pass
# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"   # AMQP端口
      - "15672:15672"  # 管理界面
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: pass
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

volumes:
  rabbitmq_data:

启动后访问 http://localhost:15672,用admin/pass登录。

发布消息(订单服务)

# order-service/producer.py
import aio_pika
import json
from typing import Any

class MessageProducer:
    def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
        self.amqp_url = amqp_url
        self.connection = None
        self.channel = None
    
    async def connect(self):
        self.connection = await aio_pika.connect_robust(self.amqp_url)
        self.channel = await self.connection.channel()
        # 设置QoS,每次最多处理10条消息
        await self.channel.set_qos(prefetch_count=10)
    
    async def publish(self, queue_name: str, message: dict, priority: int = 0):
        if not self.channel:
            await self.connect()
        
        # 声明队列(幂等操作)
        queue = await self.channel.declare_queue(
            queue_name,
            durable=True,       # 队列持久化
            arguments={
                "x-max-priority": 10  # 支持优先级
            }
        )
        
        await self.channel.default_exchange.publish(
            aio_pika.Message(
                body=json.dumps(message, ensure_ascii=False).encode(),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,  # 消息持久化
                priority=priority,
                headers={"source": "order-service"}
            ),
            routing_key=queue_name
        )

# 使用
producer = MessageProducer()

@app.post("/orders")
async def create_order(req: CreateOrderRequest):
    order_id = await db.create_order(req)
    
    await producer.publish("order.created", {
        "order_id": order_id,
        "user_id": req.user_id,
        "amount": req.amount,
        "items": [{"sku": item.sku, "qty": item.qty} for item in req.items]
    }, priority=5)
    
    return {"order_id": order_id}

消费消息(库存服务)

# inventory-service/consumer.py
import aio_pika
import asyncio
from typing import Callable

class MessageConsumer:
    def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
        self.amqp_url = amqp_url
    
    async def consume(self, queue_name: str, handler: Callable):
        connection = await aio_pika.connect_robust(self.amqp_url)
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        
        queue = await channel.declare_queue(queue_name, durable=True)
        
        async def process_message(message: aio_pika.IncomingMessage):
            async with message.process(requeue=False):  # 处理失败不自动重入队
                try:
                    payload = json.loads(message.body.decode())
                    await handler(payload)
                except Exception as e:
                    # 发送到死信队列
                    await message.nack(requeue=False)
                    await send_to_dead_letter_queue(queue_name, payload, str(e))
        
        await queue.consume(process_message)
        print(f"[*] Waiting for messages on {queue_name}. To exit press CTRL+C")
        await asyncio.Future()  # 永久运行

# 库存扣减处理器
async def handle_order_created(payload: dict):
    order_id = payload["order_id"]
    items = payload["items"]
    
    async with db.transaction():
        for item in items:
            # 乐观锁扣库存
            result = await db.execute("""
                UPDATE inventory 
                SET stock = stock - :qty, version = version + 1
                WHERE sku = :sku AND stock >= :qty AND version = :version
            """, {"sku": item["sku"], "qty": item["qty"], "version": current_version})
            
            if result.rowcount == 0:
                # 库存不足,触发补偿事件
                await producer.publish("inventory.insufficient", {
                    "order_id": order_id,
                    "sku": item["sku"]
                })
                raise Exception(f"库存不足: {item['sku']}")
        
        # 扣库存成功,触发支付事件
        await producer.publish("inventory.deducted", {
            "order_id": order_id,
            "items": items
        })

# 启动消费者
if __name__ == "__main__":
    consumer = MessageConsumer()
    asyncio.run(consumer.consume("order.created", handle_order_created))

实战二:Redis Streams(轻量方案)

如果你已经用了Redis,Streams是零成本方案:

# 发布消息
import redis.asyncio as redis

async def publish_to_stream(stream_key: str, data: dict):
    r = await redis.from_url("redis://localhost:6379")
    await r.xadd(stream_key, data, id="*")  # * 让Redis自动生成ID
    await r.aclose()

# 消费消息(消费者组模式,支持多实例负载均衡)
async def consume_from_stream(stream_key: str, group_name: str, consumer_name: str):
    r = await redis.from_url("redis://localhost:6379")
    
    # 创建消费者组(幂等)
    try:
        await r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
    except redis.ResponseError:
        pass  # 组已存在
    
    while True:
        # 读取新消息(阻塞式)
        messages = await r.xreadgroup(
            group_name, consumer_name,
            {stream_key: ">"},  # ">" 表示读取未消费的消息
            count=10, block=5000
        )
        
        for stream, msgs in messages:
            for msg_id, fields in msgs:
                try:
                    await process_message(dict(fields))
                    await r.xack(stream_key, group_name, msg_id)  # 确认消费
                except Exception as e:
                    print(f"处理失败: {msg_id}, error: {e}")
                    # 可以记录到特殊的处理失败集合
                    await r.hset(f"failed:{stream_key}", msg_id, str(e))

Redis Streams vs RabbitMQ:

特性 Redis Streams RabbitMQ
运维成本 零(已有Redis) 需要独立部署
持久化 支持(AOF/RDB) 支持
消费者组 支持 支持
延迟队列 需zset配合 原生支持
消息TTL 需xtrim 原生支持

实战三:Kafka(大数据场景)

当消息量达到百万/秒级别,Kafka是不二之选:

# 生产者
from aiokafka import AIOKafkaProducer
import json

async def kafka_producer():
    producer = AIOKafkaProducer(
        bootstrap_servers="localhost:9092",
        value_serializer=lambda v: json.dumps(v).encode(),
        acks="all",  # 所有副本确认,最强可靠性
        retries=3
    )
    await producer.start()
    try:
        await producer.send_and_wait("order-events", {
            "event": "order.created",
            "order_id": "12345",
            "timestamp": datetime.utcnow().isoformat()
        })
    finally:
        await producer.stop()

# 消费者
from aiokafka import AIOKafkaConsumer

async def kafka_consumer():
    consumer = AIOKafkaConsumer(
        "order-events",
        bootstrap_servers="localhost:9092",
        group_id="inventory-service",
        auto_offset_reset="earliest"
    )
    await consumer.start()
    try:
        async for msg in consumer:
            payload = json.loads(msg.value)
            await handle_order_event(payload)
    finally:
        await consumer.stop()

事件驱动架构设计

消息队列的最佳实践是事件驱动,而不是RPC的变种:

反模式:用消息队列做RPC

#  错误:把消息队列当HTTP用
await publish("order.create", req)
response = await wait_for_response("order.create.response")  # 同步等待

正模式:事件驱动 + 补偿

#  正确:事件驱动
# 订单服务:发布事件
await publish("order.created", { "order_id": order_id })

# 库存服务:订阅事件,处理成功后发布新事件
await subscribe("order.created", async (payload) => {
    await deduct_inventory(payload)
    await publish("inventory.deducted", { "order_id": payload.order_id })
})

# 支付服务:订阅库存扣减事件
await subscribe("inventory.deducted", async (payload) => {
    await process_payment(payload)
    await publish("payment.processed", { "order_id": payload.order_id })
})

# 订单服务:订阅最终完成事件
await subscribe("payment.processed", async (payload) => {
    await update_order_status(payload.order_id, "paid")
})

幂等性处理

网络抖动会导致消息重复投递,消费者必须做幂等:

async def handle_order_created(payload: dict):
    order_id = payload["order_id"]
    
    # 方案1:数据库唯一约束
    try:
        await db.execute(
            "INSERT INTO order_events (order_id, event_type, processed_at) VALUES (:oid, 'created', NOW())",
            {"oid": order_id}
        )
    except IntegrityError:
        print(f"订单 {order_id} 已处理,跳过")
        return
    
    # 业务逻辑...
# 方案2:Redis分布式锁
async def handle_order_created(payload: dict):
    order_id = payload["order_id"]
    lock_key = f"lock:order:{order_id}"
    
    r = await redis.from_url("redis://localhost:6379")
    acquired = await r.set(lock_key, "1", nx=True, ex=60)  # 60秒过期
    if not acquired:
        print(f"订单 {order_id} 正在处理中,跳过")
        return
    
    try:
        await process_order(order_id)
    finally:
        await r.delete(lock_key)

死信队列(DLQ)处理失败消息

# 定义死信队列
async def setup_dead_letter_queue(channel: aio_pika.Channel):
    # 主队列,绑定死信交换机
    await channel.declare_queue(
        "order.created",
        durable=True,
        arguments={
            "x-dead-letter-exchange": "dlx",
            "x-dead-letter-routing-key": "order.created.dlq"
        }
    )
    
    # 死信交换机
    dlx = await channel.declare_exchange("dlx", aio_pika.ExchangeType.DIRECT)
    
    # 死信队列
    dlq = await channel.declare_queue("order.created.dlq", durable=True)
    await dlq.bind(dlx, "order.created.dlq")

# 消费死信队列(人工介入或延迟重试)
async def consume_dlq():
    connection = await aio_pika.connect_robust("amqp://admin:pass@localhost:5672/")
    channel = await connection.channel()
    dlq = await channel.declare_queue("order.created.dlq", durable=True)
    
    async def handle_failed(msg: aio_pika.IncomingMessage):
        payload = json.loads(msg.body.decode())
        error = msg.headers.get("x-died-reason", "unknown")
        
        # 记录到数据库,等待人工处理
        await db.execute(
            "INSERT INTO failed_messages (payload, error, created_at) VALUES (:p, :e, NOW())",
            {"p": json.dumps(payload), "e": error}
        )
        msg.ack()
    
    await dlq.consume(handle_failed)

MonkeyCode Prompt模板

我有一个[电商/社交/物联网]系统,需要引入消息队列解耦[具体场景]。
请帮我:
1. 选型建议(RabbitMQ/Redis/Kafka),给出对比
2. 设计事件类型和payload结构
3. 生成生产者和消费者代码(含幂等处理)
4. 配置死信队列和重试机制
5. 生成Docker Compose部署配置
6. 编写单元测试和集成测试

踩过的坑

  1. 消息丢失:没开持久化,RabbitMQ重启后消息全丢 → 设置delivery_mode=PERSISTENT
  2. 重复消费:消费者没做幂等,网络超时时重试导致重复处理 → 数据库唯一约束或Redis锁
  3. 队列堆积:消费者处理速度跟不上生产速度 → 增加消费者实例 + 监控队列长度
  4. 内存泄漏:RabbitMQ默认不限制内存使用 → 配置vm_memory_high_watermark=0.4

总结

消息队列的核心价值是解耦削峰。选RabbitMQ做业务消息,用Redis Streams做轻量场景,上Kafka处理大数据流。MonkeyCode能帮你从选型、代码生成到部署配置一站式搞定。

记住:事件驱动,不是RPC。消息发出去就别等回复,用事件链串起整个业务流程。

文章摘自:https://www.cnblogs.com/jaryn/p/20224094