09MQ延迟消息

1.延迟消息的含义以及其作用

电商支付业务中,为保证用户体验,下单时会立即扣减库存。但用户若下单不付款,会长期占用资源,影响其他交易。
解决方案是:设置支付超时时间(如 30 分钟),超时未支付则取消订单并释放库存。
这类延时执行的任务称为延迟任务,最常用的实现方式是利用 MQ 的延迟消息。

如下图所示,流程是:交易服务创建订单后扣减库存,调用支付服务让用户支付,但是交易服务不知道用户是否支付了,所以就把一个消息放到MQ中,半个小时后发送这条信息到自己的服务中,根据消息进行查询数据库,查看是否支付了,如果没有支付,就删除订单,然后把扣减的库存加回来

实现延迟消息有两种方法:死信交换机、DelayExchange插件

2.死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:

假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒,效果就成了先等待了5000毫秒,然后进入hmall.direct,然后发送了消息,这样就能实现延迟消息。

但是死信交换机是我们自己实现的,很麻烦,所以更推荐使用下面这种方法

3.DelayExchange插件

3.1DelayExchange下载

插件下载地址:rabbitmq/rabbitmq-delayed-message-exchange:RabbitMQ 的延迟消息传递,我们使用的rabbitMQ是3.8版本,所以插件下载3.8.17版本

也可以使用SpringCloud微服务—资料的day07的下载好的包。
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:

3.2DelayExchange的使用

使用DelayExchange只需要在注释exchange中加入delayed = “true”

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

4.解决超时订单问题

4.1定义常量

无论是消息发送还是接收都是在交易服务完成,因此我们在trade-service中定义一个常量类,用于记录交换机、队列、RoutingKey等常量:

packagecom.hmall.trade.constants;

publicinterfaceMQConstants {
    String DELAY_EXCHANGE_NAME= "trade.delay.direct";
    String DELAY_ORDER_QUEUE_NAME= "trade.delay.order.queue";
    String DELAY_ORDER_KEY= "delay.order.query";
}

4.2配置trade-service模块的MQ

  <!--amqp-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
spring:
  rabbitmq:
    host: 192.168.150.101
    port: 5672
    virtual-host: /hmall
    username: hmall
    password: 123

4.3改造下单业务,发送延迟消息

修改trade-service模块的com.hmall.trade.service.impl.OrderServiceImpl类的createOrder方法,添加消息发送的代码:

这里延迟消息的时间应该是15分钟,不过我们为了测试方便,改成10秒。

4.4编写查询支付状态接口

首先,在hm-api模块定义三个类:

说明:

  • PayOrderDTO:支付单的数据传输实体
  • PayClient:支付系统的Feign客户端
  • PayClientFallback:支付系统的fallback逻辑

PayOrderDTO代码如下:

package com.hmall.api.dto;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * <p>
 * 支付订单
 * </p>
 */
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {
    @ApiModelProperty("id")
    private Long id;
    @ApiModelProperty("业务订单号")
    private Long bizOrderNo;
    @ApiModelProperty("支付单号")
    private Long payOrderNo;
    @ApiModelProperty("支付用户id")
    private Long bizUserId;
    @ApiModelProperty("支付渠道编码")
    private String payChannelCode;
    @ApiModelProperty("支付金额,单位分")
    private Integer amount;
    @ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")
    private Integer payType;
    @ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")
    private Integer status;
    @ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")
    private String expandJson;
    @ApiModelProperty("第三方返回业务码")
    private String resultCode;
    @ApiModelProperty("第三方返回提示信息")
    private String resultMsg;
    @ApiModelProperty("支付成功时间")
    private LocalDateTime paySuccessTime;
    @ApiModelProperty("支付超时时间")
    private LocalDateTime payOverTime;
    @ApiModelProperty("支付二维码链接")
    private String qrCodeUrl;
    @ApiModelProperty("创建时间")
    private LocalDateTime createTime;
    @ApiModelProperty("更新时间")
    private LocalDateTime updateTime;
}

PayClient代码如下:

package com.hmall.api.client;

import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {
    /**
     * 根据交易订单id查询支付单
     * @param id 业务订单id
     * @return 支付单信息
     */
    @GetMapping("/pay-orders/biz/{id}")
    PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}

PayClientFallback代码如下:

package com.hmall.api.client.fallback;

import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;

@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {
    @Override
    public PayClient create(Throwable cause) {
        return new PayClient() {
            @Override
            public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
                return null;
            }
        };
    }
}

最后,在pay-service模块的PayController中实现该接口:

@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
    PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
    return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

4.5监听消息,查询支付状态

接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:

代码如下:

packagecom.hmall.trade.listener;

importcom.hmall.api.client.PayClient;
importcom.hmall.api.dto.PayOrderDTO;
importcom.hmall.trade.constants.MQConstants;
importcom.hmall.trade.domain.po.Order;
importcom.hmall.trade.service.IOrderService;
importlombok.RequiredArgsConstructor;
importorg.springframework.amqp.rabbit.annotation.Exchange;
importorg.springframework.amqp.rabbit.annotation.Queue;
importorg.springframework.amqp.rabbit.annotation.QueueBinding;
importorg.springframework.amqp.rabbit.annotation.RabbitListener;
importorg.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
publicclassOrderDelayMessageListener {

    privatefinalIOrderService orderService;
    privatefinalPayClient payClient;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),
            exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),
            key = MQConstants.DELAY_ORDER_KEY))
    publicvoidlistenOrderDelayMessage(Long orderId){
        // 1.查询订单Order order = orderService.getById(orderId);
        // 2.检测订单状态,判断是否已支付if(order == null|| order.getStatus() != 1){
            // 订单不存在或者已经支付return;
        }
        // 3.未支付,需要查询支付流水状态PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
        // 4.判断是否支付if(payOrder != null&& payOrder.getStatus() == 3){
            // 4.1.已支付,标记订单状态为已支付orderService.markOrderPaySuccess(orderId);
        }else{
            // TODO 4.2.未支付,取消订单,回复库存orderService.cancelOrder(orderId);
        }
    }
}

注意,这里要在OrderServiceImpl中实现cancelOrder方法,留给读者自行实现。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注