同步调用是微服务耦合的根源。消息队列解耦了服务,也救了你的响应时间。用MonkeyCode,从选型到实现一把梭。
为什么需要消息队列?
一个典型的电商下单流程,同步调用长这样:
# 同步调用:创建订单要等所有下游返回
@app.post("/orders")
async def create_order(req: CreateOrderRequest):
user = await get_user(req.user_id) # 50ms
payment = await process_payment(req) # 200ms
inventory = await deduct_inventory(req) # 100ms
notify = await send_notification(req) # 80ms
log = await write_order_log(req) # 30ms
return {"status": "ok"} # 总共 460ms
用户点击”下单”后,要等460ms才能看到结果。而且任何一个服务挂了,下单就失败了。
引入消息队列后:
# 异步处理:下单只管写队列,20ms内返回
@app.post("/orders")
async def create_order(req: CreateOrderRequest):
order_id = await save_order(req) # 写数据库 20ms
await publish_event("order.created", { # 发消息 5ms
"order_id": order_id,
"user_id": req.user_id,
"amount": req.amount
})
return {"order_id": order_id, "status": "processing"} # 25ms
下游的支付、扣库存、发通知,全部异步处理。用户体验快了18倍。
主流消息队列选型
MonkeyCode帮你选:
| 方案 | 适用场景 | 吞吐量 | 延迟 |
|---|---|---|---|
| RabbitMQ | 业务消息、延迟队列 | 中等(万级/秒) | 毫秒级 |
| Redis Streams | 轻量场景、已有Redis | 高(十万级/秒) | 毫秒级 |
| Kafka | 日志流、大数据管道 | 极高(百万级/秒) | 毫秒~秒级 |
| Amazon SQS | 云原生、无运维 | 高 | 秒级 |
实战一:RabbitMQ + FastAPI
安装配置
让MonkeyCode生成Docker配置:
为RabbitMQ生成docker-compose配置,包含管理界面,默认用户密码admin/pass
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672" # AMQP端口
- "15672:15672" # 管理界面
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: pass
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
启动后访问 http://localhost:15672,用admin/pass登录。
发布消息(订单服务)
# order-service/producer.py
import aio_pika
import json
from typing import Any
class MessageProducer:
def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
self.amqp_url = amqp_url
self.connection = None
self.channel = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.amqp_url)
self.channel = await self.connection.channel()
# 设置QoS,每次最多处理10条消息
await self.channel.set_qos(prefetch_count=10)
async def publish(self, queue_name: str, message: dict, priority: int = 0):
if not self.channel:
await self.connect()
# 声明队列(幂等操作)
queue = await self.channel.declare_queue(
queue_name,
durable=True, # 队列持久化
arguments={
"x-max-priority": 10 # 支持优先级
}
)
await self.channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps(message, ensure_ascii=False).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 消息持久化
priority=priority,
headers={"source": "order-service"}
),
routing_key=queue_name
)
# 使用
producer = MessageProducer()
@app.post("/orders")
async def create_order(req: CreateOrderRequest):
order_id = await db.create_order(req)
await producer.publish("order.created", {
"order_id": order_id,
"user_id": req.user_id,
"amount": req.amount,
"items": [{"sku": item.sku, "qty": item.qty} for item in req.items]
}, priority=5)
return {"order_id": order_id}
消费消息(库存服务)
# inventory-service/consumer.py
import aio_pika
import asyncio
from typing import Callable
class MessageConsumer:
def __init__(self, amqp_url: str = "amqp://admin:pass@localhost:5672/"):
self.amqp_url = amqp_url
async def consume(self, queue_name: str, handler: Callable):
connection = await aio_pika.connect_robust(self.amqp_url)
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue(queue_name, durable=True)
async def process_message(message: aio_pika.IncomingMessage):
async with message.process(requeue=False): # 处理失败不自动重入队
try:
payload = json.loads(message.body.decode())
await handler(payload)
except Exception as e:
# 发送到死信队列
await message.nack(requeue=False)
await send_to_dead_letter_queue(queue_name, payload, str(e))
await queue.consume(process_message)
print(f"[*] Waiting for messages on {queue_name}. To exit press CTRL+C")
await asyncio.Future() # 永久运行
# 库存扣减处理器
async def handle_order_created(payload: dict):
order_id = payload["order_id"]
items = payload["items"]
async with db.transaction():
for item in items:
# 乐观锁扣库存
result = await db.execute("""
UPDATE inventory
SET stock = stock - :qty, version = version + 1
WHERE sku = :sku AND stock >= :qty AND version = :version
""", {"sku": item["sku"], "qty": item["qty"], "version": current_version})
if result.rowcount == 0:
# 库存不足,触发补偿事件
await producer.publish("inventory.insufficient", {
"order_id": order_id,
"sku": item["sku"]
})
raise Exception(f"库存不足: {item['sku']}")
# 扣库存成功,触发支付事件
await producer.publish("inventory.deducted", {
"order_id": order_id,
"items": items
})
# 启动消费者
if __name__ == "__main__":
consumer = MessageConsumer()
asyncio.run(consumer.consume("order.created", handle_order_created))
实战二:Redis Streams(轻量方案)
如果你已经用了Redis,Streams是零成本方案:
# 发布消息
import redis.asyncio as redis
async def publish_to_stream(stream_key: str, data: dict):
r = await redis.from_url("redis://localhost:6379")
await r.xadd(stream_key, data, id="*") # * 让Redis自动生成ID
await r.aclose()
# 消费消息(消费者组模式,支持多实例负载均衡)
async def consume_from_stream(stream_key: str, group_name: str, consumer_name: str):
r = await redis.from_url("redis://localhost:6379")
# 创建消费者组(幂等)
try:
await r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
except redis.ResponseError:
pass # 组已存在
while True:
# 读取新消息(阻塞式)
messages = await r.xreadgroup(
group_name, consumer_name,
{stream_key: ">"}, # ">" 表示读取未消费的消息
count=10, block=5000
)
for stream, msgs in messages:
for msg_id, fields in msgs:
try:
await process_message(dict(fields))
await r.xack(stream_key, group_name, msg_id) # 确认消费
except Exception as e:
print(f"处理失败: {msg_id}, error: {e}")
# 可以记录到特殊的处理失败集合
await r.hset(f"failed:{stream_key}", msg_id, str(e))
Redis Streams vs RabbitMQ:
| 特性 | Redis Streams | RabbitMQ |
|---|---|---|
| 运维成本 | 零(已有Redis) | 需要独立部署 |
| 持久化 | 支持(AOF/RDB) | 支持 |
| 消费者组 | 支持 | 支持 |
| 延迟队列 | 需zset配合 | 原生支持 |
| 消息TTL | 需xtrim | 原生支持 |
实战三:Kafka(大数据场景)
当消息量达到百万/秒级别,Kafka是不二之选:
# 生产者
from aiokafka import AIOKafkaProducer
import json
async def kafka_producer():
producer = AIOKafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode(),
acks="all", # 所有副本确认,最强可靠性
retries=3
)
await producer.start()
try:
await producer.send_and_wait("order-events", {
"event": "order.created",
"order_id": "12345",
"timestamp": datetime.utcnow().isoformat()
})
finally:
await producer.stop()
# 消费者
from aiokafka import AIOKafkaConsumer
async def kafka_consumer():
consumer = AIOKafkaConsumer(
"order-events",
bootstrap_servers="localhost:9092",
group_id="inventory-service",
auto_offset_reset="earliest"
)
await consumer.start()
try:
async for msg in consumer:
payload = json.loads(msg.value)
await handle_order_event(payload)
finally:
await consumer.stop()
事件驱动架构设计
消息队列的最佳实践是事件驱动,而不是RPC的变种:
反模式:用消息队列做RPC
# 错误:把消息队列当HTTP用
await publish("order.create", req)
response = await wait_for_response("order.create.response") # 同步等待
正模式:事件驱动 + 补偿
# 正确:事件驱动
# 订单服务:发布事件
await publish("order.created", { "order_id": order_id })
# 库存服务:订阅事件,处理成功后发布新事件
await subscribe("order.created", async (payload) => {
await deduct_inventory(payload)
await publish("inventory.deducted", { "order_id": payload.order_id })
})
# 支付服务:订阅库存扣减事件
await subscribe("inventory.deducted", async (payload) => {
await process_payment(payload)
await publish("payment.processed", { "order_id": payload.order_id })
})
# 订单服务:订阅最终完成事件
await subscribe("payment.processed", async (payload) => {
await update_order_status(payload.order_id, "paid")
})
幂等性处理
网络抖动会导致消息重复投递,消费者必须做幂等:
async def handle_order_created(payload: dict):
order_id = payload["order_id"]
# 方案1:数据库唯一约束
try:
await db.execute(
"INSERT INTO order_events (order_id, event_type, processed_at) VALUES (:oid, 'created', NOW())",
{"oid": order_id}
)
except IntegrityError:
print(f"订单 {order_id} 已处理,跳过")
return
# 业务逻辑...
# 方案2:Redis分布式锁
async def handle_order_created(payload: dict):
order_id = payload["order_id"]
lock_key = f"lock:order:{order_id}"
r = await redis.from_url("redis://localhost:6379")
acquired = await r.set(lock_key, "1", nx=True, ex=60) # 60秒过期
if not acquired:
print(f"订单 {order_id} 正在处理中,跳过")
return
try:
await process_order(order_id)
finally:
await r.delete(lock_key)
死信队列(DLQ)处理失败消息
# 定义死信队列
async def setup_dead_letter_queue(channel: aio_pika.Channel):
# 主队列,绑定死信交换机
await channel.declare_queue(
"order.created",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "order.created.dlq"
}
)
# 死信交换机
dlx = await channel.declare_exchange("dlx", aio_pika.ExchangeType.DIRECT)
# 死信队列
dlq = await channel.declare_queue("order.created.dlq", durable=True)
await dlq.bind(dlx, "order.created.dlq")
# 消费死信队列(人工介入或延迟重试)
async def consume_dlq():
connection = await aio_pika.connect_robust("amqp://admin:pass@localhost:5672/")
channel = await connection.channel()
dlq = await channel.declare_queue("order.created.dlq", durable=True)
async def handle_failed(msg: aio_pika.IncomingMessage):
payload = json.loads(msg.body.decode())
error = msg.headers.get("x-died-reason", "unknown")
# 记录到数据库,等待人工处理
await db.execute(
"INSERT INTO failed_messages (payload, error, created_at) VALUES (:p, :e, NOW())",
{"p": json.dumps(payload), "e": error}
)
msg.ack()
await dlq.consume(handle_failed)
MonkeyCode Prompt模板
我有一个[电商/社交/物联网]系统,需要引入消息队列解耦[具体场景]。
请帮我:
1. 选型建议(RabbitMQ/Redis/Kafka),给出对比
2. 设计事件类型和payload结构
3. 生成生产者和消费者代码(含幂等处理)
4. 配置死信队列和重试机制
5. 生成Docker Compose部署配置
6. 编写单元测试和集成测试
踩过的坑
- 消息丢失:没开持久化,RabbitMQ重启后消息全丢 → 设置
delivery_mode=PERSISTENT - 重复消费:消费者没做幂等,网络超时时重试导致重复处理 → 数据库唯一约束或Redis锁
- 队列堆积:消费者处理速度跟不上生产速度 → 增加消费者实例 + 监控队列长度
- 内存泄漏:RabbitMQ默认不限制内存使用 → 配置
vm_memory_high_watermark=0.4
总结
消息队列的核心价值是解耦和削峰。选RabbitMQ做业务消息,用Redis Streams做轻量场景,上Kafka处理大数据流。MonkeyCode能帮你从选型、代码生成到部署配置一站式搞定。
记住:事件驱动,不是RPC。消息发出去就别等回复,用事件链串起整个业务流程。
文章摘自:https://www.cnblogs.com/jaryn/p/20224094
