Kafka

Kafka

Kafka 可以理解为一个高性能消息中间件,主要用来:解耦系统(不同服务之间用消息传递,不直接依赖)、削峰填谷(应对突发流量)、异步通信流式数据处理

概念

Producer(生产者)

负责向 Kafka 发送消息

  • 把消息发送到某个 Topic(主题)
  • 发送时可以指定消息要写入哪个 Partition(分区)
  • 通常是“异步发送”,效率高

Consumer(消费者)

从 Kafka 中读取消息

  • 订阅一个或多个 Topic
  • 消费的方式是拉取(pull)模式
  • 多个消费者可以组成一个 Consumer Group(消费者组) 来分工处理数据

Topic(主题)

对消息进行分类和存储

  • 每个主题可以有多个 Partition(分区)
  • 生产者往主题里发消息,消费者从主题里读消息
  • 比如有主题 order_loguser_behavior

Partition(分区)

Kafka 的高性能关键所在

  • 每个 Topic 会被分成多个 Partition
  • 每个 Partition 是一个有序、不可变的消息队列
  • Partition 里的每条消息都有一个唯一的编号:offset(偏移量)
  • Kafka 通过分区来实现并行处理数据分布

Offset(偏移量)

消息在分区中的唯一位置编号

记录offset:Kafka 会为每个消费者组维护一个表<groupId、topic、partition、已消费到的 offset>

  • 每条消息在分区中都有一个 offset
  • 消费者会记录自己消费到哪里(比如 offset=120)
  • 这个 offset 用来实现断点续传,保证消息的有序性和可靠性

Broker(代理服务器)

Kafka 的服务器节点

  • 每台 Broker 就是一个 Kafka 实例
  • 集群由多个Broker 组成
  • 每个 Broker 存储若干个 Partition 的数据

Replication(副本机制)

Kafka 为每个分区保存多个副本

  • 一个分区通常有 1 个 Leader 和若干个 Follower
  • Producer 和 Consumer 只和 Leader 交互
  • 如果 Leader 挂了,会由 Follower 自动接管

ZooKeeper

​ 为了提高集群的可用性和稳定性, 架构中还会引入ZooKeeper, ZooKeeper用于维护Kafka集群中的Broker节点信息、Partition信息、Topic信息、集群管理、Leader选举、存储Metadata信息等

​ 在Kafka中,每个Broker都会向ZooKeeper注册自己的节点信息,包括Broker ID、IP地址和端口号等。同时,每个PartitionMetadata信息也会存储在ZooKeeper中,包括该PartitionReplica信息、Leader信息、ISR信息等。当Broker加入或退出集群时,ZooKeeper会自动通知其他Broker更新集群的状态信息。在Leader选举时,ZooKeeper会根据预设的算法选举出新的Leader,并通知其他Broker更新Partition的状态信息。

主要架构

  • Producer将消息发送到Broker节点,Broker将消息存储到对应的Partition
  • 每个Partition可以有多个Replica,其中一个Replica被选为Leader,其余ReplicaFollower
  • Leader负责处理消息的写操作,将消息追加到Partition中
  • Follower负责与Leader保持同步,定期从Leader中拉取消息并复制到本地副本中,以保证数据的一致性
  • ConsumerBroker中读取消息,可以指定消费某个Topic中的指定Partition中的消息,也可以进行批量消费或实时消费
  • Broker将消息存储在磁盘中,同时也会缓存部分消息到内存中,以提高读写性能

搭建Kafka集群

Kafka Assistant

官网:https://www.redisant.com/ka

测试、查看Kafka的GUI工具

在Linux的Docker上

三个Zookeeper、三个Kafka及一个Kafka-UI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
version: '3.1'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
volumes:
- ./data/zookeeper/zoo1/data:/data
- ./data/zookeeper/zoo1/datalog:/datalog

zoo2:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo2
container_name: zoo2
ports:
- "2182:2182"
environment:
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
volumes:
- ./data/zookeeper/zoo2/data:/data
- ./data/zookeeper/zoo2/datalog:/datalog

zoo3:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo3
container_name: zoo3
ports:
- "2183:2183"
environment:
ZOOKEEPER_CLIENT_PORT: 2183
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
volumes:
- ./data/zookeeper/zoo3/data:/data
- ./data/zookeeper/zoo3/datalog:/datalog



kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
volumes:
- ./data/kafka_data1:/kafka/data
depends_on:
- zoo1
- zoo2
- zoo3

kafka2:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka2
container_name: kafka2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
volumes:
- ./data/kafka_data2:/kafka/data
depends_on:
- zoo1
- zoo2
- zoo3

kafka3:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka3
container_name: kafka3
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
volumes:
- ./data/kafka_data3:/kafka/data
depends_on:
- zoo1
- zoo2
- zoo3

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 9999:8080
depends_on:
- kafka1
- kafka2
- kafka3

environment:
KAFKA_CLUSTERS_0_NAME: k1
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092

KAFKA_CLUSTERS_1_NAME: k2
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093

KAFKA_CLUSTERS_2_NAME: k3
KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094

解释

  • Zookeeper

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 每个节点唯一编号
    ZOOKEEPER_SERVER_ID: 1
    # 这表示这三台 ZooKeeper 会通过 2888(同步端口)和 3888(选举端口)组成一个ZK集群
    ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888

    # 数据挂载的位置
    volumes:
    - ./data/zookeeper/zoo1/data:/data
    - ./data/zookeeper/zoo1/datalog:/datalog
  • Kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    environment:
    # 定义 Kafka 访问地址
    # PLAINTEXT:当客户端(生产者、消费者、UI 等)连接 Kafka 时使用的接口,明文协议
    # INTERNAL:容器内部通信(Kafka Broker 之间用)
    # EXTERNAL:主机(宿主机)访问
    # DOCKER:Docker 网络中的访问方式
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,INTERNAL://kafka1:19092
    # 定义通信协议
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
    # 指定 Broker 间通信通道
    KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    # 连接 ZooKeeper 集群的地址
    KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
    # 唯一编号
    KAFKA_BROKER_ID: 1
    # 系统级主题(__consumer_offsets)的副本数量配置,存储消费者的消费进度
    # 建议有几个kafka就设置几
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
    # 至少要有多少个副本同步成功,才能算写入成功
    KAFKA_MIN_INSYNC_REPLICAS: 2
    # 日志目录
    KAFKA_LOG_DIRS: /kafka/data

在Docker Desktop上

保存为docker-compose.yml文件,执行docker-compose up -d命令安装,会在所在文件夹下挂载数据文件

通过kafka-ui的http://localhost:9999/可查看节点状态

一个Zookeeper、两个Kafka及一个Kafka-UI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- ./data/zookeeper:/data
- ./data/zookeeper-log:/datalog

kafka1:
image: confluentinc/cp-kafka:7.3.2
container_name: kafka1
ports:
# 映射到宿主机,供外部工具(如 Kafka Assistant)连接
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

# 1. 定义两个监听器:
# - INTERNAL: 供 Docker 容器内部(如 kafka-ui)使用,使用服务名
# - EXTERNAL: 供宿主机外部工具(如 Kafka Assistant)使用,使用 localhost
KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092

# 2. 告诉客户端 Broker 的正确地址(关键步骤!)
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:29092,EXTERNAL://localhost:9092

# 3. 映射安全协议
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

# 4. 设置 Broker 之间通信使用的监听器名称
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_LOG_DIRS: /kafka/data
volumes:
- ./data/kafka1:/kafka/data
depends_on:
- zookeeper

kafka2:
image: confluentinc/cp-kafka:7.3.2
container_name: kafka2
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

# 1. 定义两个监听器
KAFKA_LISTENERS: INTERNAL://:29093,EXTERNAL://:9093

# 2. 告诉客户端 Broker 的正确地址
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:29093,EXTERNAL://localhost:9093

# 3. 映射安全协议
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

# 4. 设置 Broker 之间通信使用的监听器名称
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_LOG_DIRS: /kafka/data
volumes:
- ./data/kafka2:/kafka/data
depends_on:
- zookeeper

kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "9999:8080"
environment:
KAFKA_CLUSTERS_0_NAME: LocalCluster
# 使用内部监听器 (INTERNAL) 的地址:服务名:内部端口
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29093
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
depends_on:
- kafka1
- kafka2

SpringBoot整合Kafka

快速使用

Spring Boot 版本为3.5.6

  1. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>

    <!-- kafka -->
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- fast-json -->
    <dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.59</version>
    </dependency>

    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    </dependency>

    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    </dependency>
    </dependencies>
  2. 添加配置

    1
    2
    3
    4
    5
    server:
    port: 8081
    spring:
    kafka:
    bootstrap-servers: 127.0.0.1:9092, 127.0.0.1:9093
  3. 使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Slf4j
    @RestController
    public class HelloController {
    private static final String topic = "test";

    @Resource
    private KafkaTemplate<Object, Object> kafkaTemplate;

    // 消费者,可指定消费者id、监听的topic
    @KafkaListener(id = "helloGroup", topics = topic)
    public void listen(String msg) {
    log.info("hello receive value: {}", msg);
    }

    @GetMapping("hello")
    public String hello() {
    // 生产者,发送消息
    kafkaTemplate.send(topic, "hello");
    return "hello";
    }
    }

创建Topic

Topic 相当于一个命名的通道或类别,用于组织和存储特定的数据流。

  • 生产者 (Producer) 将数据发布(写入)到一个特定的 Topic。
  • 消费者 (Consumer) 从它订阅的一个或多个 Topic 中读取数据。

自动创建

向不存在的topic发送消息,会自动创建,

默认分区数(Partitions) = 1,副本数(Replica) = 1

手动创建

​ 只要你在项目中引入了 Spring Kafka 依赖,并且配置了 spring.kafka.bootstrap-servers,Spring Boot 就会自动配置一个 KafkaAdmin Bean。正是这个 Bean 负责扫描所有的 NewTopic Bean 并与 Kafka Broker 进行通信以创建 Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class KafkaConfig {

// 简单单分区Topic
@Bean
public NewTopic myTopic() {
return TopicBuilder.name("my-first-topic")
.partitions(1) // 设置分区数为 1
.replicas(1) // 设置副本因子为 1
.build();
}

// 多个分区和副本的Topic
@Bean
public NewTopic userEventsTopic() {
return TopicBuilder.name("user-events-topic")
.partitions(3) // 设置分区数为 3
.replicas(2) // 设置副本因子为 2 (需要至少 2 个 Kafka Broker)
.compact() // 日志压缩,日志中只会保留key对应的最新值
.build();
}
}

有些高级设置需要通过.config()来配置

发送消息

简单异步发送

1
2
3
4
5
6
7
8
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;

@GetMapping("hello")
public String hello() {
kafkaTemplate.send(topic, "hello");
return "hello";
}

同步获取发送结果

1
2
3
4
5
6
7
8
9
10
11
12
13
@GetMapping("/hello2")
public String hello2() {
// 同步获取结果
CompletableFuture<SendResult<Object, Object>> future = kafkaTemplate.send("hello2","hello2");
try {
SendResult<Object,Object> result = future.get();
log.info("success >>> {}", result.getRecordMetadata().topic()); // success >>> hello2
}catch (Throwable e){
e.printStackTrace();
}

return "hello2";
}

异步获取发送结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@GetMapping("/hello3")
public String hello3(String topic, String key, String message) {
CompletableFuture<SendResult<Object, Object>> future = kafkaTemplate.send("hello2", "async hello2");

future.whenComplete((result, throwable) -> {
if (throwable != null) {
// 发送失败处理
log.error("fail >>>>{}", throwable.getMessage());
} else {
// 发送成功处理
log.info("async success >>> {}", result.getRecordMetadata().topic());
}
});

return "hello3";
}

消息的请求应答

在普通 Kafka 使用场景中,我们的消息是单向发送的:Producer → Topic → Consumer

而有些业务场景中需要请求-响应机制

  • 发送请求消息到 Kafka;
  • 然后等待 Consumer 处理完后回发响应;
  • Producer 收到响应再继续业务逻辑。

Spring Kafka 提供了一个专门的工具类ReplyingKafkaTemplate<K, V, R>来实现这种“请求—回复”通信

  1. 请求响应配置类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    @Configuration
    public class KafkaRequestReplyConfig {

    public static final String REPLY_TOPIC = "response-topic";

    // 这是一个 消费者容器,用于专门监听“响应消息”的 Topic(这里是 response-topic)。
    // 当 Producer 发送一条消息(请求)时,它会在消息头中指定一个 replyTopic(即 response-topic)。
    // Consumer 处理完后,会把结果发送回这个 Topic。
    // 这个容器就会收到那条“回复消息”。
    @Bean
    public KafkaMessageListenerContainer<String, String> replyContainer(
    ConsumerFactory<String, String> consumerFactory) {

    // 指定监听的 Topic
    ContainerProperties containerProperties = new ContainerProperties(REPLY_TOPIC);

    // 重要:必须设置唯一的GroupId,避免干扰其他消费者
    containerProperties.setGroupId("reply-group-" + System.currentTimeMillis());

    // 后台自动轮询消费消息
    return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    }

    // 定义ReplyingKafkaTemplate
    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
    ProducerFactory<String, String> producerFactory,
    KafkaMessageListenerContainer<String, String> replyContainer) {

    // 第一个参数是常规的 ProducerFactory,第二个参数是监听回复 Topic 的容器
    ReplyingKafkaTemplate<String, String, String> template =
    new ReplyingKafkaTemplate<>(producerFactory, replyContainer);

    // 设置等待回复的超时时间 (例如 10 秒)
    template.setDefaultReplyTimeout(java.time.Duration.ofSeconds(10));

    return template;
    }

    // 因为有上面的kafka的Bean,不会再自动注入kafkaTemplate,需要手动注入
    @Bean
    public KafkaTemplate kafkaTemplate(
    ProducerFactory<String, String> producerFactory) {

    return new KafkaTemplate<>(producerFactory);
    }
    }
  2. 使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    @Slf4j
    @RestController
    public class HelloController {
    // 消息的key类型、消息的value类型、回复消息的value类型
    @Resource
    private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;

    @GetMapping("reply")
    public String sendRequestAndGetReply() throws ExecutionException, InterruptedException {
    // 构建请求消息
    // 在头部指定回复Topic
    String message = "hello";
    String topic = "topic-1";
    Message<String> requestMessage = MessageBuilder
    .withPayload(message)
    .setHeader(KafkaHeaders.TOPIC, topic) // 目标请求 Topic
    .setHeader(KafkaHeaders.REPLY_TOPIC, KafkaRequestReplyConfig.REPLY_TOPIC) // 回复 Topic
    .build();

    // 发送消息
    RequestReplyMessageFuture<String, String> future = replyingKafkaTemplate.sendAndReceive(requestMessage);

    // 阻塞等待回复消息
    Message<?> replyMessage = future.get();
    System.out.println(replyMessage.getPayload());

    return replyMessage.getPayload().toString();
    }

    @KafkaListener(topics = "topic-1", groupId = "handler-group")
    @SendTo // 自动将方法的返回值发送到消息头指定的回复Topic
    public String handleRequest(
    String request,
    @Header(KafkaHeaders.REPLY_TOPIC) String replyTopic,
    @Header(KafkaHeaders.CORRELATION_ID) byte[] correlationId) {

    System.out.println("收到请求: " + request);

    // ... 执行处理逻辑 ...

    // Spring Kafka 会自动处理回复消息的构建和相关 ID 的复制,
    // 只需要返回结果即可。
    return request.toUpperCase();
    }
    }
  • @**SendTo(“topic_name”)**:用于指定消息被发送到的目标 Topic
  • **@KafkaListener(topics = “topic_name”, groupId = “group_id”)**:用于标记一个方法作为Kafka 消费者
    • topics :指定监听的topic ,可以指定多个
    • groupId :消费者组是一组共享相同 Topic 的消费者的集合
    • containerFactory:指定创建消费者的Factory,来自定义消费者的配置
    • errorHandler:异常处理逻辑,实现KafkaListenerErrorHandler接口来创建处理异常handler

自定义消费者工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Bean
public ConsumerFactory<String, String> customConsumerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka broker 地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 消费组 ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-group");
// 关闭自动提交,让你能手动控制 offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// key 和 value 的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 一次 poll 拉取的最大记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// 如果没有 offset,从最新的位置开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

return new DefaultKafkaConsumerFactory<>(props);
}

// 配合 @KafkaListener 使用的工厂
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> customConsumerFactory,
KafkaTemplate<String, String> kafkaTemplate) {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(customConsumerFactory);
// 控制并发度(相当于同组内多少个线程同时消费)
factory.setConcurrency(3);
// 开启批量消费(可选)
factory.setBatchListener(false);
// 开启手动提交偏移量
// 在消费者中加上参数Acknowledgment acknowledgment
// 然后acknowledgment.acknowledge();提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 不知道为什么自动注入的kafkaTemplate没识别到,这里手动设置
factory.setReplyTemplate(kafkaTemplate);

return factory;
}

Kafka
http://xwww12.github.io/2025/10/07/中间件/Kafka/
作者
xw
发布于
2025年10月7日
许可协议