09MQ延迟消息
1.延迟消息的含义以及其作用
电商支付业务中,为保证用户体验,下单时会立即扣减库存。但用户若下单不付款,会长期占用资源,影响其他交易。
解决方案是:设置支付超时时间(如 30 分钟),超时未支付则取消订单并释放库存。
这类延时执行的任务称为延迟任务,最常用的实现方式是利用 MQ 的延迟消息。
如下图所示,流程是:交易服务创建订单后扣减库存,调用支付服务让用户支付,但是交易服务不知道用户是否支付了,所以就把一个消息放到MQ中,半个小时后发送这条信息到自己的服务中,根据消息进行查询数据库,查看是否支付了,如果没有支付,就删除订单,然后把扣减的库存加回来

实现延迟消息有两种方法:死信交换机、DelayExchange插件
2.死信交换机
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:

假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒,效果就成了先等待了5000毫秒,然后进入hmall.direct,然后发送了消息,这样就能实现延迟消息。

但是死信交换机是我们自己实现的,很麻烦,所以更推荐使用下面这种方法
3.DelayExchange插件
3.1DelayExchange下载
插件下载地址:rabbitmq/rabbitmq-delayed-message-exchange:RabbitMQ 的延迟消息传递,我们使用的rabbitMQ是3.8版本,所以插件下载3.8.17版本
也可以使用SpringCloud微服务—资料的day07的下载好的包。
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果如下:
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
运行结果如下:

3.2DelayExchange的使用
使用DelayExchange只需要在注释exchange中加入delayed = “true”
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
发送消息时,必须通过x-delay属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
4.解决超时订单问题
4.1定义常量
无论是消息发送还是接收都是在交易服务完成,因此我们在trade-service中定义一个常量类,用于记录交换机、队列、RoutingKey等常量:
packagecom.hmall.trade.constants;
publicinterfaceMQConstants {
String DELAY_EXCHANGE_NAME= "trade.delay.direct";
String DELAY_ORDER_QUEUE_NAME= "trade.delay.order.queue";
String DELAY_ORDER_KEY= "delay.order.query";
}
4.2配置trade-service模块的MQ
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.150.101
port: 5672
virtual-host: /hmall
username: hmall
password: 123
4.3改造下单业务,发送延迟消息
修改trade-service模块的com.hmall.trade.service.impl.OrderServiceImpl类的createOrder方法,添加消息发送的代码:
这里延迟消息的时间应该是15分钟,不过我们为了测试方便,改成10秒。
4.4编写查询支付状态接口
首先,在hm-api模块定义三个类:
说明:
- PayOrderDTO:支付单的数据传输实体
- PayClient:支付系统的Feign客户端
- PayClientFallback:支付系统的fallback逻辑
PayOrderDTO代码如下:
package com.hmall.api.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.time.LocalDateTime;
/**
* <p>
* 支付订单
* </p>
*/
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {
@ApiModelProperty("id")
private Long id;
@ApiModelProperty("业务订单号")
private Long bizOrderNo;
@ApiModelProperty("支付单号")
private Long payOrderNo;
@ApiModelProperty("支付用户id")
private Long bizUserId;
@ApiModelProperty("支付渠道编码")
private String payChannelCode;
@ApiModelProperty("支付金额,单位分")
private Integer amount;
@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")
private Integer payType;
@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")
private Integer status;
@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")
private String expandJson;
@ApiModelProperty("第三方返回业务码")
private String resultCode;
@ApiModelProperty("第三方返回提示信息")
private String resultMsg;
@ApiModelProperty("支付成功时间")
private LocalDateTime paySuccessTime;
@ApiModelProperty("支付超时时间")
private LocalDateTime payOverTime;
@ApiModelProperty("支付二维码链接")
private String qrCodeUrl;
@ApiModelProperty("创建时间")
private LocalDateTime createTime;
@ApiModelProperty("更新时间")
private LocalDateTime updateTime;
}
PayClient代码如下:
package com.hmall.api.client;
import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {
/**
* 根据交易订单id查询支付单
* @param id 业务订单id
* @return 支付单信息
*/
@GetMapping("/pay-orders/biz/{id}")
PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
PayClientFallback代码如下:
package com.hmall.api.client.fallback;
import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {
@Override
public PayClient create(Throwable cause) {
return new PayClient() {
@Override
public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
return null;
}
};
}
}
最后,在pay-service模块的PayController中实现该接口:
@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}
4.5监听消息,查询支付状态
接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:
代码如下:
packagecom.hmall.trade.listener;
importcom.hmall.api.client.PayClient;
importcom.hmall.api.dto.PayOrderDTO;
importcom.hmall.trade.constants.MQConstants;
importcom.hmall.trade.domain.po.Order;
importcom.hmall.trade.service.IOrderService;
importlombok.RequiredArgsConstructor;
importorg.springframework.amqp.rabbit.annotation.Exchange;
importorg.springframework.amqp.rabbit.annotation.Queue;
importorg.springframework.amqp.rabbit.annotation.QueueBinding;
importorg.springframework.amqp.rabbit.annotation.RabbitListener;
importorg.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
publicclassOrderDelayMessageListener {
privatefinalIOrderService orderService;
privatefinalPayClient payClient;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),
exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),
key = MQConstants.DELAY_ORDER_KEY))
publicvoidlistenOrderDelayMessage(Long orderId){
// 1.查询订单Order order = orderService.getById(orderId);
// 2.检测订单状态,判断是否已支付if(order == null|| order.getStatus() != 1){
// 订单不存在或者已经支付return;
}
// 3.未支付,需要查询支付流水状态PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
// 4.判断是否支付if(payOrder != null&& payOrder.getStatus() == 3){
// 4.1.已支付,标记订单状态为已支付orderService.markOrderPaySuccess(orderId);
}else{
// TODO 4.2.未支付,取消订单,回复库存orderService.cancelOrder(orderId);
}
}
}
注意,这里要在OrderServiceImpl中实现cancelOrder方法,留给读者自行实现。