当前位置:首页 > Java > 正文

Java语言Kafka集成教程(手把手教你用Java连接Apache Kafka)

在现代分布式系统中,Apache Kafka 是一个非常流行的高吞吐量、低延迟的消息队列系统。很多企业使用 Kafka 来处理日志收集、事件流、实时数据管道等场景。如果你正在学习 Java 开发,并希望将 Kafka 集成到你的项目中,那么本篇 Kafka集成教程 将从零开始,一步步带你完成配置、编码和测试全过程。

一、准备工作

在开始之前,请确保你已经安装并启动了以下组件:

  • Java 8 或更高版本(推荐 Java 11+)
  • Apache Kafka(可本地安装或使用 Docker)
  • Maven 或 Gradle 构建工具
Java语言Kafka集成教程(手把手教你用Java连接Apache Kafka) Kafka集成教程 Java Kafka消费者 Kafka生产者 Spring Boot集成Kafka 第1张

二、创建 Maven 项目

我们使用 Maven 来管理依赖。打开你的 IDE(如 IntelliJ IDEA 或 Eclipse),新建一个 Maven 项目,并在 pom.xml 中添加 Kafka 客户端依赖:

<dependencies>    <!-- Kafka 客户端依赖 -->    <dependency>        <groupId>org.apache.kafka</groupId>        <artifactId>kafka-clients</artifactId>        <version>3.5.0</version>    </dependency>    <!-- 日志依赖(可选)-->    <dependency>        <groupId>org.slf4j</groupId>        <artifactId>slf4j-simple</artifactId>        <version>2.0.7</version>    </dependency></dependencies>

三、编写 Kafka 生产者(Producer)

Kafka 生产者负责向 Kafka 主题(Topic)发送消息。下面是一个简单的 Java Kafka生产者 示例:

import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class SimpleProducer {    public static void main(String[] args) {        // 配置生产者属性        Properties props = new Properties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        // 创建生产者实例        Producer<String, String> producer = new KafkaProducer<>(props);        // 发送消息        for (int i = 0; i < 5; i++) {            ProducerRecord<String, String> record =                 new ProducerRecord<>("my-topic", "key-" + i, "Hello Kafka Message " + i);            producer.send(record, (metadata, exception) -> {                if (exception != null) {                    exception.printStackTrace();                } else {                    System.out.printf("Sent: %s to partition %d, offset %d%n",                        record.value(), metadata.partition(), metadata.offset());                }            });        }        // 关闭生产者        producer.close();    }}

注意:请确保 Kafka 服务已在 localhost:9092 启动,并且主题 my-topic 已存在。如果没有,可以使用以下命令创建:

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

四、编写 Kafka 消费者(Consumer)

接下来,我们实现一个 Java Kafka消费者 来接收刚刚发送的消息:

import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class SimpleConsumer {    public static void main(String[] args) {        // 配置消费者属性        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        // 创建消费者实例        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        // 订阅主题        consumer.subscribe(Collections.singletonList("my-topic"));        // 持续拉取消息        try {            while (true) {                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));                for (ConsumerRecord<String, String> record : records) {                    System.out.printf("Received: key=%s, value=%s, partition=%d, offset=%d%n",                        record.key(), record.value(), record.partition(), record.offset());                }            }        } finally {            consumer.close();        }    }}

五、Spring Boot 集成 Kafka(进阶)

如果你使用的是 Spring Boot 框架,集成 Kafka 会更加简单。只需添加 spring-kafka 依赖,并通过注解配置即可。这是很多开发者选择的方式,也属于 Spring Boot集成Kafka 的标准做法。

pom.xml 中添加:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

然后在 application.yml 中配置:

spring:  kafka:    bootstrap-servers: localhost:9092    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      group-id: my-spring-group      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

之后就可以使用 @KafkaListener 注解监听消息,或使用 KafkaTemplate 发送消息,极大简化开发流程。

六、总结

通过本篇 Kafka集成教程,你已经学会了如何使用 Java 原生 API 编写 Kafka 生产者和消费者,也了解了如何在 Spring Boot 项目中快速集成 Kafka。无论你是初学者还是有一定经验的开发者,掌握这些基础技能都将为你构建高可用、高性能的分布式系统打下坚实基础。

记住关键词:Kafka集成教程Java Kafka消费者Java Kafka生产者Spring Boot集成Kafka——它们是你深入学习 Kafka 的重要方向。

祝你编码顺利,消息不丢!