消息队列
MQ
Message Queue
微服务间通讯方式
微服务间通讯有同步和异步两种方式
优点 | 缺点 | |
---|---|---|
同步调用 | 时效性较强,可以立即得到结果 | - 耦合度高 - 性能和吞吐能力下降 - 有额外的资源消耗 - 有级联失败问题 |
异步调用 | - 吞吐量提升:无需等待订阅者处理完成,响应更快速 - 故障隔离:服务没有直接调用,不存在级联失败问题 - 调用间没有阻塞,不会造成无效的资源占用 - 耦合度极低,每个服务都可以灵活插拔,可替换 - 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件 |
- 架构复杂了,业务没有明显的流程线,不好管理 - 需要依赖于Broker的可靠、安全、性能 |
常见的MQ实现
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ
通过Docker安装
- 选择带有mangement的版本(带有管理界面)安装
rabbitmq - Official Image | Docker Hub
执行命令启动容器
1
2
3
4
5
6
7
8
9docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management通过15672端口打开管理界面
mq基本结构
RabbitMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
小案例
通过rabbitmq来实现消息的返送和接收
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.229.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.229.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
SpringAMQP
Advanced Message Queuing Protocol(高级消息队列协议)
SpringAMQP: 基于AMQP定义的一套API规范
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
SimpleQueue(简单队列模型)
一个生产者和一个消费者
通过SpringAMQP来实现消息的返送和接收
导入依赖
1
2
3
4
5<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>编写配置
1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.229.128
virtual-host: /
port: 5672
username: root
password: 123456生产者
1
2
3
4
5
6
7
8
9
10
11
12
13@EnableRabbit
@SpringBootTest
public class SpringAMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue"; #消息队列名
String message = "testMessage"; #发送的消息
rabbitTemplate.convertAndSend(queueName, message);
}
}消费者(同样要导入依赖、编写配置)
1
2
3
4
5
6
7
8
9//注入一个listener,自动监听队列
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") #监听的队列名
public void listenerSimpleQueueMessage(String msg) {
System.out.println(msg);
}
}
//启动项目即可监听队列,有消息自动获取
WorkQueue(任务队列模型)
让多个消费者绑定到一个队列,共同消费队列中的消息。
返送多条消息
1
2
3
4
5
6
7
8
9@Test
public void testWorkQueue() {
//发送50条消息
for (int i = 0; i < 50; ++i) {
String queueName = "simple.queue";
String message = "testMessage" + i;
rabbitTemplate.convertAndSend(queueName, message);
}
}多个消费者
1
2
3
4
5
6
7
8
9
10
11@RabbitListener(queues = "simple.queue")
public void listenerWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20); //处理快
}
@RabbitListener(queues = "simple.queue")
public void listenerWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200); //处理慢
}如果不配置的话会,消息是平均分配给每个消费者,并没有考虑到消费者的处理能力
1
2
3
4
5
6
7
8
9
10spring:
rabbitmq:
host: 192.168.229.128
virtual-host: /
port: 5672
username: root
password: 123456
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
发布/订阅模型
Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。交换机只负责转发消息,不具备存储消息的能力。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Fanout交换机
消息会通过交换机广播到所有绑定队列
- 声明队列和交换机
1 |
|
声明生产者
1
2
3
4
5
6
7@Test
public void testFanoutExchange() {
//发送到交换机
String exchangeName = "test.fanout";
String message = "testMessage";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}声明消费者
1
2
3
4
5
6
7
8
9
10
11
12@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenerWorkQueueMessage1(String msg) {
System.out.println("消费者1接收到消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenerWorkQueueMessage2(String msg) {
System.err.println("消费者2........接收到消息:【" + msg + "】");
}
}
Direct交换机
通过key定向向队列发送消息
通过注解在listener上声明队列和交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"), //绑定队列
exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT), //绑定交换机
key = {"red", "blue"} //绑定key,只有队列有和消息相同的key时,才会收到消息
))
public void listenerDirectQueue1(String msg) {
System.out.println("消费者1接收到消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"), //绑定队列
exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT), //绑定交换机
key = {"red", "yellow"} //绑定key,只有队列有和消息相同的key时,才会收到消息
))
public void listenerDirectQueue2(String msg) {
System.out.println("消费者2接收到消息:【" + msg + "】");
}
}发送消息
1
2
3
4
5
6
7
8@Test
public void testDirectExchange() {
//发送到交换机
String exchangeName = "test.direct";
String message = "testMessage";
//只有有"yellow"的队列会收到消息
rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
}
Topic
通过通配符#(一个或多个)和.(一个)来匹配队列
通过注解在listener上声明队列和交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18//话题交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"), //绑定队列
exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC), //绑定交换机
key = "china.#" //绑定通配符,只有满足格式的消息,才会接收
))
public void listenerTopicQueue1(String msg) {
System.out.println("消费者1接收到消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"), //绑定队列
exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC), //绑定交换机
key = "#.news" //绑定通配符,只有满足格式的消息,才会接收
))
public void listenerTopicQueue2(String msg) {
System.out.println("消费者2接收到消息:【" + msg + "】");
}发送消息
1
2
3
4
5
6
7@Test
public void testTopicExchange() {
//发送到交换机
String exchangeName = "test.topic";
String message = "testMessage";
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
配置消息转换器
导入jackson转换器依赖
1
2
3
4
5<!--jackson消息转换器-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>声明转换器来替换原有的转换器
1
2
3
4@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
消息队列
http://xwww12.github.io/2022/09/11/微服务/MQ/