前言
Apache Kafka 是分布式消息队列的事实标准,本文带你实战 Spring Boot 整合 Kafka,完成生产者和消费者的完整开发。
一、Kafka 核心概念
- Producer:消息生产者
- Consumer:消息消费者
- Broker:Kafka 服务节点
- Topic:消息主题分类
- Partition:Topic 的分区,实现并行处理
- Consumer Group:消费者组,实现负载均衡
二、Docker 安装 Kafka
# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
三、Spring Boot 整合 Kafka
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
四、生产者
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrder(String orderId) {
String message = "{\"orderId\":\""+orderId+"\",\"time\":\""+LocalDateTime.now()+"\"}";
kafkaTemplate.send("order-topic", orderId, message);
System.out.println("Sent: " + message);
}
}
五、消费者
@Component
public class OrderConsumer {
@KafkaListener(topics = "order-topic", groupId = "my-group")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("Received: key=" + record.key());
System.out.println("Value: " + record.value());
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
}
}
六、手动提交 Offset
@KafkaListener(topics = "order-topic", groupId = "my-group")
public void consumeWithManualAck(
ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
processOrder(record.value());
ack.acknowledge(); // 手动确认
} catch (Exception e) {
// 处理失败,消息会被重新消费
log.error("Failed to process: {}", record.key(), e);
}
}
# 开启手动提交
spring.kafka.listener.ack-mode: manual
七、常用命令速查
# 创建 Topic
kafka-topics.sh --create --topic order-topic \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 控制台消费者
kafka-console-consumer.sh --topic order-topic \
--bootstrap-server localhost:9092 --from-beginning
总结
Kafka 是微服务架构中异步通信的核心组件。核心要点:Producer/Consumer 模式实现解耦、Partition 实现并行处理、Consumer Group 实现负载均衡、手动提交 Offset 保证消息可靠性。
觉得有帮助请点赞收藏!有问题欢迎评论区交流
文章摘自:https://www.cnblogs.com/czlws/p/19824559/apache-kafka-spring-boot-message-queue
