Kafka
Kafka
Kafka 可以理解为一个高性能消息中间件,主要用来:解耦系统(不同服务之间用消息传递,不直接依赖)、削峰填谷(应对突发流量)、异步通信、流式数据处理
概念
Producer(生产者)
负责向 Kafka 发送消息
- 把消息发送到某个 Topic(主题)
- 发送时可以指定消息要写入哪个 Partition(分区)
- 通常是“异步发送”,效率高
Consumer(消费者)
从 Kafka 中读取消息
- 订阅一个或多个 Topic
- 消费的方式是拉取(pull)模式
- 多个消费者可以组成一个 Consumer Group(消费者组) 来分工处理数据
Topic(主题)
对消息进行分类和存储
- 每个主题可以有多个 Partition(分区)
- 生产者往主题里发消息,消费者从主题里读消息
- 比如有主题
order_log、user_behavior
Partition(分区)
Kafka 的高性能关键所在
- 每个 Topic 会被分成多个 Partition
- 每个 Partition 是一个有序、不可变的消息队列
- Partition 里的每条消息都有一个唯一的编号:offset(偏移量)
- Kafka 通过分区来实现并行处理和数据分布
Offset(偏移量)
消息在分区中的唯一位置编号
记录offset:Kafka 会为每个消费者组维护一个表<groupId、topic、partition、已消费到的 offset>
- 每条消息在分区中都有一个 offset
- 消费者会记录自己消费到哪里(比如 offset=120)
- 这个 offset 用来实现断点续传,保证消息的有序性和可靠性
Broker(代理服务器)
Kafka 的服务器节点
- 每台 Broker 就是一个 Kafka 实例
- 集群由多个Broker 组成
- 每个 Broker 存储若干个 Partition 的数据
Replication(副本机制)
Kafka 为每个分区保存多个副本
- 一个分区通常有 1 个 Leader 和若干个 Follower
- Producer 和 Consumer 只和 Leader 交互
- 如果 Leader 挂了,会由 Follower 自动接管
ZooKeeper
为了提高集群的可用性和稳定性, 架构中还会引入ZooKeeper, ZooKeeper用于维护Kafka集群中的Broker节点信息、Partition信息、Topic信息、集群管理、Leader选举、存储Metadata信息等
在Kafka中,每个Broker都会向ZooKeeper注册自己的节点信息,包括Broker ID、IP地址和端口号等。同时,每个Partition的Metadata信息也会存储在ZooKeeper中,包括该Partition的Replica信息、Leader信息、ISR信息等。当Broker加入或退出集群时,ZooKeeper会自动通知其他Broker更新集群的状态信息。在Leader选举时,ZooKeeper会根据预设的算法选举出新的Leader,并通知其他Broker更新Partition的状态信息。
主要架构

Producer将消息发送到Broker节点,Broker将消息存储到对应的Partition中- 每个Partition可以有多个Replica,其中一个Replica被选为Leader,其余Replica
为Follower - Leader负责处理消息的写操作,将消息追加到Partition中
- Follower负责与Leader保持同步,定期从Leader中拉取消息并复制到本地副本中,以保证数据的一致性
Consumer从Broker中读取消息,可以指定消费某个Topic中的指定Partition中的消息,也可以进行批量消费或实时消费Broker将消息存储在磁盘中,同时也会缓存部分消息到内存中,以提高读写性能
搭建Kafka集群
Kafka Assistant
官网:https://www.redisant.com/ka
测试、查看Kafka的GUI工具
在Linux的Docker上
三个Zookeeper、三个Kafka及一个Kafka-UI
1 | |
解释:
Zookeeper
1
2
3
4
5
6
7
8
9# 每个节点唯一编号
ZOOKEEPER_SERVER_ID: 1
# 这表示这三台 ZooKeeper 会通过 2888(同步端口)和 3888(选举端口)组成一个ZK集群
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
# 数据挂载的位置
volumes:
- ./data/zookeeper/zoo1/data:/data
- ./data/zookeeper/zoo1/datalog:/datalogKafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22environment:
# 定义 Kafka 访问地址
# PLAINTEXT:当客户端(生产者、消费者、UI 等)连接 Kafka 时使用的接口,明文协议
# INTERNAL:容器内部通信(Kafka Broker 之间用)
# EXTERNAL:主机(宿主机)访问
# DOCKER:Docker 网络中的访问方式
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,INTERNAL://kafka1:19092
# 定义通信协议
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
# 指定 Broker 间通信通道
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# 连接 ZooKeeper 集群的地址
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
# 唯一编号
KAFKA_BROKER_ID: 1
# 系统级主题(__consumer_offsets)的副本数量配置,存储消费者的消费进度
# 建议有几个kafka就设置几
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
# 至少要有多少个副本同步成功,才能算写入成功
KAFKA_MIN_INSYNC_REPLICAS: 2
# 日志目录
KAFKA_LOG_DIRS: /kafka/data
在Docker Desktop上
保存为docker-compose.yml文件,执行docker-compose up -d命令安装,会在所在文件夹下挂载数据文件
通过kafka-ui的http://localhost:9999/可查看节点状态
一个Zookeeper、两个Kafka及一个Kafka-UI
1 | |
SpringBoot整合Kafka
快速使用
Spring Boot 版本为3.5.6
添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- fast-json -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.59</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>添加配置
1
2
3
4
5server:
port: 8081
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092, 127.0.0.1:9093使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21@Slf4j
@RestController
public class HelloController {
private static final String topic = "test";
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;
// 消费者,可指定消费者id、监听的topic
@KafkaListener(id = "helloGroup", topics = topic)
public void listen(String msg) {
log.info("hello receive value: {}", msg);
}
@GetMapping("hello")
public String hello() {
// 生产者,发送消息
kafkaTemplate.send(topic, "hello");
return "hello";
}
}
创建Topic
Topic 相当于一个命名的通道或类别,用于组织和存储特定的数据流。
- 生产者 (Producer) 将数据发布(写入)到一个特定的 Topic。
- 消费者 (Consumer) 从它订阅的一个或多个 Topic 中读取数据。
自动创建
向不存在的topic发送消息,会自动创建,
默认分区数(Partitions) = 1,副本数(Replica) = 1
手动创建
只要你在项目中引入了 Spring Kafka 依赖,并且配置了
spring.kafka.bootstrap-servers,Spring Boot 就会自动配置一个KafkaAdminBean。正是这个 Bean 负责扫描所有的NewTopicBean 并与 Kafka Broker 进行通信以创建 Topic
1 | |
有些高级设置需要通过.config()来配置
发送消息
简单异步发送
1 | |
同步获取发送结果
1 | |
异步获取发送结果
1 | |
消息的请求应答
在普通 Kafka 使用场景中,我们的消息是单向发送的:Producer → Topic → Consumer
而有些业务场景中需要请求-响应机制:
- 发送请求消息到 Kafka;
- 然后等待 Consumer 处理完后回发响应;
- Producer 收到响应再继续业务逻辑。
Spring Kafka 提供了一个专门的工具类ReplyingKafkaTemplate<K, V, R>来实现这种“请求—回复”通信
请求响应配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47@Configuration
public class KafkaRequestReplyConfig {
public static final String REPLY_TOPIC = "response-topic";
// 这是一个 消费者容器,用于专门监听“响应消息”的 Topic(这里是 response-topic)。
// 当 Producer 发送一条消息(请求)时,它会在消息头中指定一个 replyTopic(即 response-topic)。
// Consumer 处理完后,会把结果发送回这个 Topic。
// 这个容器就会收到那条“回复消息”。
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(
ConsumerFactory<String, String> consumerFactory) {
// 指定监听的 Topic
ContainerProperties containerProperties = new ContainerProperties(REPLY_TOPIC);
// 重要:必须设置唯一的GroupId,避免干扰其他消费者
containerProperties.setGroupId("reply-group-" + System.currentTimeMillis());
// 后台自动轮询消费消息
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
// 定义ReplyingKafkaTemplate
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
ProducerFactory<String, String> producerFactory,
KafkaMessageListenerContainer<String, String> replyContainer) {
// 第一个参数是常规的 ProducerFactory,第二个参数是监听回复 Topic 的容器
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
// 设置等待回复的超时时间 (例如 10 秒)
template.setDefaultReplyTimeout(java.time.Duration.ofSeconds(10));
return template;
}
// 因为有上面的kafka的Bean,不会再自动注入kafkaTemplate,需要手动注入
@Bean
public KafkaTemplate kafkaTemplate(
ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45@Slf4j
@RestController
public class HelloController {
// 消息的key类型、消息的value类型、回复消息的value类型
@Resource
private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
@GetMapping("reply")
public String sendRequestAndGetReply() throws ExecutionException, InterruptedException {
// 构建请求消息
// 在头部指定回复Topic
String message = "hello";
String topic = "topic-1";
Message<String> requestMessage = MessageBuilder
.withPayload(message)
.setHeader(KafkaHeaders.TOPIC, topic) // 目标请求 Topic
.setHeader(KafkaHeaders.REPLY_TOPIC, KafkaRequestReplyConfig.REPLY_TOPIC) // 回复 Topic
.build();
// 发送消息
RequestReplyMessageFuture<String, String> future = replyingKafkaTemplate.sendAndReceive(requestMessage);
// 阻塞等待回复消息
Message<?> replyMessage = future.get();
System.out.println(replyMessage.getPayload());
return replyMessage.getPayload().toString();
}
@KafkaListener(topics = "topic-1", groupId = "handler-group")
@SendTo // 自动将方法的返回值发送到消息头指定的回复Topic
public String handleRequest(
String request,
@Header(KafkaHeaders.REPLY_TOPIC) String replyTopic,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlationId) {
System.out.println("收到请求: " + request);
// ... 执行处理逻辑 ...
// Spring Kafka 会自动处理回复消息的构建和相关 ID 的复制,
// 只需要返回结果即可。
return request.toUpperCase();
}
}
- @**SendTo(“topic_name”)**:用于指定消息被发送到的目标 Topic
- **@KafkaListener(topics = “topic_name”, groupId = “group_id”)**:用于标记一个方法作为Kafka 消费者
- topics :指定监听的topic ,可以指定多个
- groupId :消费者组是一组共享相同
Topic的消费者的集合 - containerFactory:指定创建消费者的Factory,来自定义消费者的配置
- errorHandler:异常处理逻辑,实现
KafkaListenerErrorHandler接口来创建处理异常handler
自定义消费者工厂
1 | |