Kafka
Kafka
Kafka 可以理解为一个高性能消息中间件,主要用来:解耦系统(不同服务之间用消息传递,不直接依赖)、削峰填谷(应对突发流量)、异步通信、流式数据处理
概念
Producer(生产者)
负责向 Kafka 发送消息
- 把消息发送到某个 Topic(主题)
- 发送时可以指定消息要写入哪个 Partition(分区)
- 通常是“异步发送”,效率高
Consumer(消费者)
从 Kafka 中读取消息
- 订阅一个或多个 Topic
- 消费的方式是拉取(pull)模式
- 多个消费者可以组成一个 Consumer Group(消费者组) 来分工处理数据
Topic(主题)
对消息进行分类和存储
- 每个主题可以有多个 Partition(分区)
- 生产者往主题里发消息,消费者从主题里读消息
- 比如有主题
order_log
、user_behavior
Partition(分区)
Kafka 的高性能关键所在
- 每个 Topic 会被分成多个 Partition
- 每个 Partition 是一个有序、不可变的消息队列
- Partition 里的每条消息都有一个唯一的编号:offset(偏移量)
- Kafka 通过分区来实现并行处理和数据分布
Offset(偏移量)
消息在分区中的唯一位置编号
记录offset:Kafka 会为每个消费者组维护一个表<groupId、topic、partition、已消费到的 offset>
- 每条消息在分区中都有一个 offset
- 消费者会记录自己消费到哪里(比如 offset=120)
- 这个 offset 用来实现断点续传,保证消息的有序性和可靠性
Broker(代理服务器)
Kafka 的服务器节点
- 每台 Broker 就是一个 Kafka 实例
- 集群由多个Broker 组成
- 每个 Broker 存储若干个 Partition 的数据
Replication(副本机制)
Kafka 为每个分区保存多个副本
- 一个分区通常有 1 个 Leader 和若干个 Follower
- Producer 和 Consumer 只和 Leader 交互
- 如果 Leader 挂了,会由 Follower 自动接管
ZooKeeper
为了提高集群的可用性和稳定性
, 架构中还会引入ZooKeeper
, ZooKeeper
用于维护Kafka集群
中的Broker节点
信息、Partition
信息、Topic
信息、集群管理、Leader选举、存储Metadata信息等
在Kafka
中,每个Broker
都会向ZooKeeper
注册自己的节点信息,包括Broker ID、IP地址和端口号
等。同时,每个Partition
的Metadata
信息也会存储在ZooKeeper
中,包括该Partition
的Replica
信息、Leader
信息、ISR
信息等。当Broker
加入或退出集群时,ZooKeeper
会自动通知其他Broker
更新集群的状态信息。在Leader
选举时,ZooKeeper
会根据预设的算法选举出新的Leader
,并通知其他Broker
更新Partition
的状态信息。
主要架构
Producer
将消息发送到Broker
节点,Broker
将消息存储到对应的Partition
中- 每个Partition可以有多个Replica,其中一个Replica被选为Leader,其余Replica
为
Follower - Leader负责处理消息的写操作,将消息追加到Partition中
- Follower负责与Leader保持同步,定期从Leader中拉取消息并复制到本地副本中,以保证数据的一致性
Consumer
从Broker
中读取消息,可以指定消费某个Topic
中的指定Partition
中的消息,也可以进行批量消费或实时消费Broker
将消息存储在磁盘中,同时也会缓存部分消息到内存中,以提高读写性能
搭建Kafka集群
Kafka Assistant
官网:https://www.redisant.com/ka
测试、查看Kafka的GUI工具
在Linux的Docker上
三个Zookeeper、三个Kafka及一个Kafka-UI
1 |
|
解释:
Zookeeper
1
2
3
4
5
6
7
8
9# 每个节点唯一编号
ZOOKEEPER_SERVER_ID: 1
# 这表示这三台 ZooKeeper 会通过 2888(同步端口)和 3888(选举端口)组成一个ZK集群
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
# 数据挂载的位置
volumes:
- ./data/zookeeper/zoo1/data:/data
- ./data/zookeeper/zoo1/datalog:/datalogKafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22environment:
# 定义 Kafka 访问地址
# PLAINTEXT:当客户端(生产者、消费者、UI 等)连接 Kafka 时使用的接口,明文协议
# INTERNAL:容器内部通信(Kafka Broker 之间用)
# EXTERNAL:主机(宿主机)访问
# DOCKER:Docker 网络中的访问方式
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,INTERNAL://kafka1:19092
# 定义通信协议
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
# 指定 Broker 间通信通道
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# 连接 ZooKeeper 集群的地址
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
# 唯一编号
KAFKA_BROKER_ID: 1
# 系统级主题(__consumer_offsets)的副本数量配置,存储消费者的消费进度
# 建议有几个kafka就设置几
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
# 至少要有多少个副本同步成功,才能算写入成功
KAFKA_MIN_INSYNC_REPLICAS: 2
# 日志目录
KAFKA_LOG_DIRS: /kafka/data
在Docker Desktop上
保存为docker-compose.yml
文件,执行docker-compose up -d
命令安装,会在所在文件夹下挂载数据文件
通过kafka-ui的http://localhost:9999/
可查看节点状态
一个Zookeeper、两个Kafka及一个Kafka-UI
1 |
|
SpringBoot整合Kafka
快速使用
Spring Boot 版本为3.5.6
添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- fast-json -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.59</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>添加配置
1
2
3
4
5server:
port: 8081
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092, 127.0.0.1:9093使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21@Slf4j
@RestController
public class HelloController {
private static final String topic = "test";
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;
// 消费者,可指定消费者id、监听的topic
@KafkaListener(id = "helloGroup", topics = topic)
public void listen(String msg) {
log.info("hello receive value: {}", msg);
}
@GetMapping("hello")
public String hello() {
// 生产者,发送消息
kafkaTemplate.send(topic, "hello");
return "hello";
}
}
创建Topic
Topic 相当于一个命名的通道或类别,用于组织和存储特定的数据流。
- 生产者 (Producer) 将数据发布(写入)到一个特定的 Topic。
- 消费者 (Consumer) 从它订阅的一个或多个 Topic 中读取数据。
自动创建
向不存在的topic发送消息,会自动创建,
默认分区数(Partitions) = 1,副本数(Replica) = 1
手动创建
只要你在项目中引入了 Spring Kafka 依赖,并且配置了
spring.kafka.bootstrap-servers
,Spring Boot 就会自动配置一个KafkaAdmin
Bean。正是这个 Bean 负责扫描所有的NewTopic
Bean 并与 Kafka Broker 进行通信以创建 Topic
1 |
|
有些高级设置需要通过.config()
来配置
发送消息
简单异步发送
1 |
|
同步获取发送结果
1 |
|
异步获取发送结果
1 |
|
消息的请求应答
在普通 Kafka 使用场景中,我们的消息是单向发送的:Producer → Topic → Consumer
而有些业务场景中需要请求-响应机制:
- 发送请求消息到 Kafka;
- 然后等待 Consumer 处理完后回发响应;
- Producer 收到响应再继续业务逻辑。
Spring Kafka 提供了一个专门的工具类ReplyingKafkaTemplate<K, V, R>
来实现这种“请求—回复”通信
请求响应配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47@Configuration
public class KafkaRequestReplyConfig {
public static final String REPLY_TOPIC = "response-topic";
// 这是一个 消费者容器,用于专门监听“响应消息”的 Topic(这里是 response-topic)。
// 当 Producer 发送一条消息(请求)时,它会在消息头中指定一个 replyTopic(即 response-topic)。
// Consumer 处理完后,会把结果发送回这个 Topic。
// 这个容器就会收到那条“回复消息”。
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(
ConsumerFactory<String, String> consumerFactory) {
// 指定监听的 Topic
ContainerProperties containerProperties = new ContainerProperties(REPLY_TOPIC);
// 重要:必须设置唯一的GroupId,避免干扰其他消费者
containerProperties.setGroupId("reply-group-" + System.currentTimeMillis());
// 后台自动轮询消费消息
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
// 定义ReplyingKafkaTemplate
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
ProducerFactory<String, String> producerFactory,
KafkaMessageListenerContainer<String, String> replyContainer) {
// 第一个参数是常规的 ProducerFactory,第二个参数是监听回复 Topic 的容器
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
// 设置等待回复的超时时间 (例如 10 秒)
template.setDefaultReplyTimeout(java.time.Duration.ofSeconds(10));
return template;
}
// 因为有上面的kafka的Bean,不会再自动注入kafkaTemplate,需要手动注入
@Bean
public KafkaTemplate kafkaTemplate(
ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45@Slf4j
@RestController
public class HelloController {
// 消息的key类型、消息的value类型、回复消息的value类型
@Resource
private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
@GetMapping("reply")
public String sendRequestAndGetReply() throws ExecutionException, InterruptedException {
// 构建请求消息
// 在头部指定回复Topic
String message = "hello";
String topic = "topic-1";
Message<String> requestMessage = MessageBuilder
.withPayload(message)
.setHeader(KafkaHeaders.TOPIC, topic) // 目标请求 Topic
.setHeader(KafkaHeaders.REPLY_TOPIC, KafkaRequestReplyConfig.REPLY_TOPIC) // 回复 Topic
.build();
// 发送消息
RequestReplyMessageFuture<String, String> future = replyingKafkaTemplate.sendAndReceive(requestMessage);
// 阻塞等待回复消息
Message<?> replyMessage = future.get();
System.out.println(replyMessage.getPayload());
return replyMessage.getPayload().toString();
}
@KafkaListener(topics = "topic-1", groupId = "handler-group")
@SendTo // 自动将方法的返回值发送到消息头指定的回复Topic
public String handleRequest(
String request,
@Header(KafkaHeaders.REPLY_TOPIC) String replyTopic,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlationId) {
System.out.println("收到请求: " + request);
// ... 执行处理逻辑 ...
// Spring Kafka 会自动处理回复消息的构建和相关 ID 的复制,
// 只需要返回结果即可。
return request.toUpperCase();
}
}
- @**SendTo(“topic_name”)**:用于指定消息被发送到的目标 Topic
- **@KafkaListener(topics = “topic_name”, groupId = “group_id”)**:用于标记一个方法作为Kafka 消费者
- topics :指定监听的topic ,可以指定多个
- groupId :消费者组是一组共享相同
Topic
的消费者的集合 - containerFactory:指定创建消费者的Factory,来自定义消费者的配置
- errorHandler:异常处理逻辑,实现
KafkaListenerErrorHandler
接口来创建处理异常handler
自定义消费者工厂
1 |
|