RabbitMQ

RabbitMQ

RabbitMQ是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。

安装

windows

  1. 首先需要安装Erlang环境

  2. 安装RabbitMQ

Linux

见微服务/消息队列

学习

详细见微服务/消息队列

SpringBoot整合RabbitMQ

实例

功能:下订单时发送一条消息到延时队列中,若超时未付款则转发消息到真正取消订单的消息队列中

  1. 在RabbitMQ的控制面板创建用户,账号密码为mall,创建虚拟目录/mall,给mall用户/mall的所有权限

  2. 导入坐标

    1
    2
    3
    4
    5
    <!--AMQP-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  3. 编写配置

    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: localhost # rabbitmq的连接地址
    port: 5672 # rabbitmq的连接端口号
    virtual-host: /mall # rabbitmq的虚拟host
    username: mall # rabbitmq的用户名
    password: mall # rabbitmq的密码
  4. 编写配置类,用于创建交换机和队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package com.xw.mallLearning.dto;

    @Getter
    public enum QueueEnum {
    /**
    * 消息通知队列
    */
    QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
    /**
    * 消息通知ttl队列
    */
    QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

    /**
    * 交换名称
    */
    private String exchange;
    /**
    * 队列名称
    */
    private String name;
    /**
    * 路由键
    */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
    this.exchange = exchange;
    this.name = name;
    this.routeKey = routeKey;
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    package com.xw.mallLearning.config;

    /**
    * 定义交换机和消息队列
    */
    @Configuration
    public class RabbitMqConfig {
    /**
    * 订单消息实际消费队列所绑定的交换机
    */
    @Bean
    public DirectExchange orderDirect() {
    return (DirectExchange) ExchangeBuilder
    .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) // 定向交换机,设定名称
    .durable(true) // 是否持久化
    .build();
    }

    /**
    * 订单延迟队列队列所绑定的交换机
    */
    @Bean
    public DirectExchange orderTTLDirect() {
    return (DirectExchange) ExchangeBuilder
    .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) // 定向交换机,设定名称
    .durable(true) // 是否持久化
    .build();
    }

    /**
    * 订单实际消费队列
    */
    @Bean
    public Queue orderQueue() {
    return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
    }

    /**
    * 订单延迟队列(死信队列)
    * 当订单超时后被转发到订单队列
    */
    @Bean
    public Queue orderTTLQueue() {
    return QueueBuilder
    .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
    .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
    .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
    .build();
    }

    /**
    * 将订单队列绑定到交换机
    */
    @Bean
    public Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){
    return BindingBuilder
    .bind(orderQueue)
    .to(orderDirect)
    .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
    }

    /**
    * 将订单延迟队列绑定到交换机
    */
    @Bean
    Binding orderTtlBinding(DirectExchange orderTTLDirect,Queue orderTTLQueue){
    return BindingBuilder
    .bind(orderTTLQueue)
    .to(orderTTLDirect)
    .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
    }
    }

  5. 编写消息的生产者和消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    package com.xw.mallLearning.component;

    /**
    * 取消订单消息生产者
    */
    @Slf4j
    @Component
    public class CancelOrderSender {
    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId, final Long delayTime) {
    // 消息处理器,设置超时时间
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
    message.getMessageProperties().setExpiration(String.valueOf(delayTime));
    return message;
    }
    };
    // 发送消息
    amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), // 发送到的交换机
    QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), // 路由Key
    orderId, // 消息
    messagePostProcessor // 消息处理器
    );
    log.info("订单id:{}", orderId);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    package com.xw.mallLearning.component;

    /**
    * 取消订单消息消费者
    */
    @Slf4j
    @Component
    public class CancelOrderReceiver {
    @Resource
    private OmsPortalOrderService omsPortalOrderService;

    @RabbitListener(queues = "mall.order.cancel")
    public void handle(Long orderId) {
    omsPortalOrderService.cancelOrder(orderId);
    log.info("处理取消订单消息的id:{}", orderId);
    }
    }
  6. service层

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @Slf4j
    @Service
    public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {

    @Resource
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
    // 下单操作,生成订单和订单id
    log.info("下单操作....");
    Long orderId = 10L;
    // 下完单后发送一个延迟消息,若期间没有付款则取消订单
    sendDelayMessageCancelOrder(orderId);
    return CommonResult.success(null, "下单成功");
    }

    @Override
    public void cancelOrder(Long orderId) {
    log.info("取消订单,id:{}", orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {
    // 30秒
    Long delayTimes = 30 * 1000L;
    cancelOrderSender.sendMessage(orderId, delayTimes);
    }
    }
  7. controller层

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Api(tags = "订单管理")
    @RestController("/order")
    public class OmsPortalOrderController {
    @Resource
    private OmsPortalOrderService omsPortalOrderService;

    @ApiOperation("根据购物车信息生成订单")
    @PostMapping("/generateOrder")
    public CommonResult generateOrder(@RequestBody OrderParam orderParam) {
    return omsPortalOrderService.generateOrder(orderParam);
    }
    }

RabbitMQ
http://xwww12.github.io/2023/02/15/中间件/RabbitMQ/
作者
xw
发布于
2023年2月15日
许可协议