RabbitMQ
RabbitMQ
RabbitMQ是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。
安装
windows
首先需要安装Erlang环境
Erlang版本和RabbitMQ版本对照:RabbitMQ Erlang Version Requirements — RabbitMQ
下载:Downloads - Erlang/OTP 选择安装路径后一路next
安装RabbitMQ
下载:Downloading and Installing RabbitMQ — RabbitMQ 选择安装路径后一路next
下载完进入sbin目录,cmd窗口输入
1
rabbitmq-plugins enable rabbitmq_management
下载插件
点击sbin目录下的rabbitmq-service.bat启动
访问:http://localhost:15672 打开控制面板,默认账户:guest,默认密码:guest
Linux
见微服务/消息队列
学习
详细见微服务/消息队列
SpringBoot整合RabbitMQ
实例
功能:下订单时发送一条消息到延时队列中,若超时未付款则转发消息到真正取消订单的消息队列中
在RabbitMQ的控制面板创建用户,账号密码为mall,创建虚拟目录/mall,给mall用户/mall的所有权限
导入坐标
1
2
3
4
5<!--AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>编写配置
1
2
3
4
5
6
7spring:
rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /mall # rabbitmq的虚拟host
username: mall # rabbitmq的用户名
password: mall # rabbitmq的密码编写配置类,用于创建交换机和队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.xw.mallLearning.dto;
@Getter
public enum QueueEnum {
/**
* 消息通知队列
*/
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
/**
* 消息通知ttl队列
*/
QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");
/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73package com.xw.mallLearning.config;
/**
* 定义交换机和消息队列
*/
@Configuration
public class RabbitMqConfig {
/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
public DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) // 定向交换机,设定名称
.durable(true) // 是否持久化
.build();
}
/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
public DirectExchange orderTTLDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) // 定向交换机,设定名称
.durable(true) // 是否持久化
.build();
}
/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/**
* 订单延迟队列(死信队列)
* 当订单超时后被转发到订单队列
*/
@Bean
public Queue orderTTLQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}
/**
* 将订单队列绑定到交换机
*/
@Bean
public Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTTLDirect,Queue orderTTLQueue){
return BindingBuilder
.bind(orderTTLQueue)
.to(orderTTLDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
}编写消息的生产者和消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package com.xw.mallLearning.component;
/**
* 取消订单消息生产者
*/
@Slf4j
@Component
public class CancelOrderSender {
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(Long orderId, final Long delayTime) {
// 消息处理器,设置超时时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(delayTime));
return message;
}
};
// 发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), // 发送到的交换机
QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), // 路由Key
orderId, // 消息
messagePostProcessor // 消息处理器
);
log.info("订单id:{}", orderId);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.xw.mallLearning.component;
/**
* 取消订单消息消费者
*/
@Slf4j
@Component
public class CancelOrderReceiver {
@Resource
private OmsPortalOrderService omsPortalOrderService;
@RabbitListener(queues = "mall.order.cancel")
public void handle(Long orderId) {
omsPortalOrderService.cancelOrder(orderId);
log.info("处理取消订单消息的id:{}", orderId);
}
}service层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28@Slf4j
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
@Resource
private CancelOrderSender cancelOrderSender;
@Override
public CommonResult generateOrder(OrderParam orderParam) {
// 下单操作,生成订单和订单id
log.info("下单操作....");
Long orderId = 10L;
// 下完单后发送一个延迟消息,若期间没有付款则取消订单
sendDelayMessageCancelOrder(orderId);
return CommonResult.success(null, "下单成功");
}
@Override
public void cancelOrder(Long orderId) {
log.info("取消订单,id:{}", orderId);
}
private void sendDelayMessageCancelOrder(Long orderId) {
// 30秒
Long delayTimes = 30 * 1000L;
cancelOrderSender.sendMessage(orderId, delayTimes);
}
}controller层
1
2
3
4
5
6
7
8
9
10
11
12@Api(tags = "订单管理")
@RestController("/order")
public class OmsPortalOrderController {
@Resource
private OmsPortalOrderService omsPortalOrderService;
@ApiOperation("根据购物车信息生成订单")
@PostMapping("/generateOrder")
public CommonResult generateOrder(@RequestBody OrderParam orderParam) {
return omsPortalOrderService.generateOrder(orderParam);
}
}
RabbitMQ
http://xwww12.github.io/2023/02/15/中间件/RabbitMQ/