Apache Kafka实战:Spring Boot消息队列完整指南

前言

Apache Kafka 是分布式消息队列的事实标准,本文带你实战 Spring Boot 整合 Kafka,完成生产者和消费者的完整开发。

一、Kafka 核心概念

  • Producer:消息生产者
  • Consumer:消息消费者
  • Broker:Kafka 服务节点
  • Topic:消息主题分类
  • Partition:Topic 的分区,实现并行处理
  • Consumer Group:消费者组,实现负载均衡

二、Docker 安装 Kafka

# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

三、Spring Boot 整合 Kafka

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

四、生产者

@Service
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendOrder(String orderId) {
        String message = "{\"orderId\":\""+orderId+"\",\"time\":\""+LocalDateTime.now()+"\"}";
        kafkaTemplate.send("order-topic", orderId, message);
        System.out.println("Sent: " + message);
    }
}

五、消费者

@Component
public class OrderConsumer {

    @KafkaListener(topics = "order-topic", groupId = "my-group")
    public void consume(ConsumerRecord<String, String> record) {
        System.out.println("Received: key=" + record.key());
        System.out.println("Value: " + record.value());
        System.out.println("Partition: " + record.partition());
        System.out.println("Offset: " + record.offset());
    }
}

六、手动提交 Offset

@KafkaListener(topics = "order-topic", groupId = "my-group")
public void consumeWithManualAck(
        ConsumerRecord<String, String> record,        Acknowledgment ack) {
    try {
        processOrder(record.value());
        ack.acknowledge();  // 手动确认
    } catch (Exception e) {
        // 处理失败,消息会被重新消费
        log.error("Failed to process: {}", record.key(), e);
    }
}

# 开启手动提交
spring.kafka.listener.ack-mode: manual

七、常用命令速查

# 创建 Topic
kafka-topics.sh --create --topic order-topic \
  --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 控制台消费者
kafka-console-consumer.sh --topic order-topic \
  --bootstrap-server localhost:9092 --from-beginning

总结

Kafka 是微服务架构中异步通信的核心组件。核心要点:Producer/Consumer 模式实现解耦、Partition 实现并行处理、Consumer Group 实现负载均衡、手动提交 Offset 保证消息可靠性。

觉得有帮助请点赞收藏!有问题欢迎评论区交流

文章摘自:https://www.cnblogs.com/czlws/p/19824559/apache-kafka-spring-boot-message-queue