RabbitMq延迟消费(TTL实现)

延时消费

RabbitMQ本身并不提供延迟队列的功能,但是我们仍然可以使用RabbitMQ的 TTL(Time-To-Live) 和 DLX(Dead Letter Exchanges) 这两个扩展特性来实现延迟队列,实现消息的延迟消费和延迟重试的功能。

实现结果

  • 固定时间延迟消费

    固定时间延迟

  • 指定时间消费
    指定时间消费

具体实现

连接配置

package com.itdfq.delay.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: QianMo
 * @Date: 2021/10/19 18:34
 * @Description:
 */
@Configuration
public class RabbitConfig {

    @Value("${spring.rabbitmq.addresses}")
    private String address;
    @Value("${spring.rabbitmq.port}")
    private String port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;


    //连接工厂
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(address + ":" + port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        //TODO  消息发送确认--回调
//        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;

    }

    //RabbitAdmin类封装对RabbitMQ的管理操作
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    //使用Template
    @Bean
    public RabbitTemplate newRabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        //设置监听确认mq(交换器)接受到信息
        rabbitTemplate.setConfirmCallback(confirmCallback());
        //添加监听 失败鉴定(路由没有收到)
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallback());
        return rabbitTemplate;
    }


    //****************生产者发送确认********************
    @Bean
    public RabbitTemplate.ConfirmCallback confirmCallback() {
        return new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (b) {
                    System.out.println("发送者确认发送给mq成功");
                } else {
                    System.out.println("发送者发送失败,考虑重发" + s);
                }
            }
        };
    }

    //****************失败通知********************
    //失败才通知,成功不通知
    @Bean
    public RabbitTemplate.ReturnCallback returnCallback() {
        return new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String replayText, String exchange, String rountingKey) {
                System.out.println("无效路由信息,需要考虑另外处理");
                System.out.println("Returned replayText:" + replayText);
                System.out.println("Returned exchange:" + exchange);
                System.out.println("Returned rountingKey:" + rountingKey);
                String s = new String(message.getBody());
                System.out.println("Returned Message:" + s);
            }
        };
    }


}

设置延时队列

  /**
     * 延时队列
     *
     *
     * @return
     */
    @Bean
    public Queue DelayQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
        // x-message-ttl 声明该队列死信可存活时间
        params.put("x-message-ttl", RabbitMqConstant.DELAY_TIME);
        return new Queue(RabbitMqConstant.DELAY_QUEUE, true, false, false, params);
    }

设置立即消费监听

package com.itdfq.delay.message.listen;

import com.itdfq.delay.constant.RabbitMqConstant;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: QianMo
 * @Date: 2021/10/21 10:29
 * @Description:
 */
@Configuration
public class DelayListenConfig {


    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Autowired
    private ConnectionFactory connectionFactory;


    /********************立即消费配置**********************/

    @Bean
    public DirectExchange Exchange() {
        DirectExchange exchange = new DirectExchange(
                RabbitMqConstant.IMMEDIATE_EXCHANGE, true, false);
        exchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return exchange;
    }

    @Bean
    public Queue Queue() {
        Queue queue = new Queue(RabbitMqConstant.IMMEDIATE_QUEUE, true, false, false);
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding subscribeNotifyBinding() {
        Binding binding = BindingBuilder.bind(Queue()).to(Exchange())
                .with(RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }


    @Bean
    public SimpleMessageListenerContainer container(
            @Qualifier(value = "delayRabbitmqListener") DelayRabbitmqListener delayRabbitmqListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(Queue());
        container.setMessageListener(delayRabbitmqListener);
        container.setDefaultRequeueRejected(false);
        //手动提交
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //设置消费者ack消息的模式,默认是自动,此处设置为手动
//        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }

}

指定时间(延时消费)

设置延时队列
   /**
     * 延时队列,可变延时
     *
     * @return
     */
    @Bean
    public Queue variableDelayQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
        return new Queue(RabbitMqConstant.DELAY_VARIABLE_QUEUE_KEY, true, false, false, params);
    }
消费者
 /**
     * 可变延时消费
     * @param msg  消息
     * @param expiration 延时时间单位 : 毫秒
     */
    public void send(String msg,Integer expiration){
        rabbitTemplate.convertAndSend(RabbitMqConstant.DELAY_VARIABLE_EXCHANGE_KEY,
                RabbitMqConstant.DELAY_VARIABLE_ROUTING_KEY, msg,
                message -> {
                    log.info("可变延时消费发送消息: {}, and expiration in {}ms", msg, expiration);
                    message.getMessageProperties().setExpiration(expiration.toString());
                    return message;
                });
    }

   转载规则


《RabbitMq延迟消费(TTL实现)》 duan 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录