消息队列

MQ

Message Queue

微服务间通讯方式

微服务间通讯有同步和异步两种方式

优点 缺点
同步调用 时效性较强,可以立即得到结果 - 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步调用 - 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能

常见的MQ实现

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

RabbitMQ

通过Docker安装

  • 选择带有mangement的版本(带有管理界面)安装

rabbitmq - Official Image | Docker Hub

  • 执行命令启动容器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    docker run \
    -e RABBITMQ_DEFAULT_USER=root \
    -e RABBITMQ_DEFAULT_PASS=123456 \
    --name mq \
    --hostname mq1 \
    -p 15672:15672 \
    -p 5672:5672 \
    -d \
    rabbitmq:3-management
  • 通过15672端口打开管理界面

mq基本结构

mq基本结构

RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

小案例

通过rabbitmq来实现消息的返送和接收

简单消息队列模型

  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("192.168.229.128");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("root");
    factory.setPassword("123456");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.发送消息
    String message = "hello, rabbitmq!";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("发送消息成功:【" + message + "】");

    // 5.关闭通道和连接
    channel.close();
    connection.close();

    }
    }
  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("192.168.229.128");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("root");
    factory.setPassword("123456");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.订阅消息
    channel.basicConsume(queueName, true, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
    AMQP.BasicProperties properties, byte[] body) throws IOException {
    // 5.处理消息
    String message = new String(body);
    System.out.println("接收到消息:【" + message + "】");
    }
    });
    System.out.println("等待接收消息。。。。");
    }
    }

SpringAMQP

Advanced Message Queuing Protocol(高级消息队列协议)

SpringAMQP: 基于AMQP定义的一套API规范

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

SimpleQueue(简单队列模型)

一个生产者和一个消费者

通过SpringAMQP来实现消息的返送和接收

  • 导入依赖

    1
    2
    3
    4
    5
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 编写配置

    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: 192.168.229.128
    virtual-host: /
    port: 5672
    username: root
    password: 123456
  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @EnableRabbit
    @SpringBootTest
    public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
    String queueName = "simple.queue"; #消息队列名
    String message = "testMessage"; #发送的消息
    rabbitTemplate.convertAndSend(queueName, message);
    }
    }
  • 消费者(同样要导入依赖、编写配置)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //注入一个listener,自动监听队列
    @Component
    public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue") #监听的队列名
    public void listenerSimpleQueueMessage(String msg) {
    System.out.println(msg);
    }
    }
    //启动项目即可监听队列,有消息自动获取

WorkQueue(任务队列模型)

多个消费者绑定到一个队列,共同消费队列中的消息。

workQueue

  • 返送多条消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testWorkQueue() {
    //发送50条消息
    for (int i = 0; i < 50; ++i) {
    String queueName = "simple.queue";
    String message = "testMessage" + i;
    rabbitTemplate.convertAndSend(queueName, message);
    }
    }
  • 多个消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RabbitListener(queues = "simple.queue")
    public void listenerWorkQueueMessage1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20); //处理快
    }

    @RabbitListener(queues = "simple.queue")
    public void listenerWorkQueueMessage2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200); //处理慢
    }
  • 如果不配置的话会,消息是平均分配给每个消费者,并没有考虑到消费者的处理能力

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spring:
    rabbitmq:
    host: 192.168.229.128
    virtual-host: /
    port: 5672
    username: root
    password: 123456
    listener:
    simple:
    prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

发布/订阅模型

发布订阅模型

Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。交换机只负责转发消息,不具备存储消息的能力。Exchange有以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Fanout交换机

消息会通过交换机广播到所有绑定队列

  • 声明队列和交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class FanoutConfig {
//声明交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("test.fanout");
}

//声明队列1
@Bean
public Queue queue1() {
return new Queue("fanout.queue1");
}

//声明队列2
@Bean
public Queue queue2() {
return new Queue("fanout.queue2");
}

//绑定交换机和队列1
@Bean
public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}

//绑定交换机和队列2
@Bean
public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}

  • 声明生产者

    1
    2
    3
    4
    5
    6
    7
    @Test
    public void testFanoutExchange() {
    //发送到交换机
    String exchangeName = "test.fanout";
    String message = "testMessage";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
  • 声明消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenerWorkQueueMessage1(String msg) {
    System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenerWorkQueueMessage2(String msg) {
    System.err.println("消费者2........接收到消息:【" + msg + "】");
    }
    }

Direct交换机

通过key定向向队列发送消息

  • 通过注解在listener上声明队列和交换机

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Component
    public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"), //绑定队列
    exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT), //绑定交换机
    key = {"red", "blue"} //绑定key,只有队列有和消息相同的key时,才会收到消息
    ))
    public void listenerDirectQueue1(String msg) {
    System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"), //绑定队列
    exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT), //绑定交换机
    key = {"red", "yellow"} //绑定key,只有队列有和消息相同的key时,才会收到消息
    ))
    public void listenerDirectQueue2(String msg) {
    System.out.println("消费者2接收到消息:【" + msg + "】");
    }
    }
  • 发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    @Test
    public void testDirectExchange() {
    //发送到交换机
    String exchangeName = "test.direct";
    String message = "testMessage";
    //只有有"yellow"的队列会收到消息
    rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
    }

Topic

通过通配符#(一个或多个)和.(一个)来匹配队列

  • 通过注解在listener上声明队列和交换机

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    //话题交换机
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"), //绑定队列
    exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC), //绑定交换机
    key = "china.#" //绑定通配符,只有满足格式的消息,才会接收
    ))
    public void listenerTopicQueue1(String msg) {
    System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"), //绑定队列
    exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC), //绑定交换机
    key = "#.news" //绑定通配符,只有满足格式的消息,才会接收
    ))
    public void listenerTopicQueue2(String msg) {
    System.out.println("消费者2接收到消息:【" + msg + "】");
    }
  • 发送消息

    1
    2
    3
    4
    5
    6
    7
    @Test
    public void testTopicExchange() {
    //发送到交换机
    String exchangeName = "test.topic";
    String message = "testMessage";
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
    }

配置消息转换器

  • 导入jackson转换器依赖

    1
    2
    3
    4
    5
    <!--jackson消息转换器-->
    <dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    </dependency>
  • 声明转换器来替换原有的转换器

    1
    2
    3
    4
    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }

消息队列
http://xwww12.github.io/2022/09/11/微服务/MQ/
作者
xw
发布于
2022年9月11日
许可协议