SoFunction
Updated on 2025-04-05

Springboot uses Rabbitmq's delay queue + dead letter queue to achieve message delay consumption

Introduction

RabbitMQ's delay queue(Delayed Queue) means that after the message is sent to the queue, it will be delayed for a certain period of time. The message will not be consumed by the consumer until the preset delay time is over. RabbitMQ itself does not support delay queues natively, but we can implement this functionality through some plugins or specific configurations.

Application scenarios of delay queues

  • Timing tasks: For example, regular email sending, push notifications and other operations.
  • Retry mechanism: When some tasks fail, you can have the task try again after a period of time.
  • Delayed consumption: Consume messages after a specific time, rather than immediately.

Dead letter queue(Dead Letter Queue, DLQ) is a special queue in the message queue, which is used to store messages that cannot be consumed normally for some reason. Dead letter queues are used in many message queue systems (such as RabbitMQ, Kafka, etc.). By transferring unconsumable messages to a dead letter queue, developers can help discover and handle exceptions or errors in the message consumption process.

The main uses of the dead letter queue are:

  • Error handling: Used to capture unconsumable messages, avoid loss of these messages, and provide opportunities for subsequent manual intervention or automatic retry.
  • Monitoring and diagnosis: By viewing the dead letter queue, it can help developers quickly discover problems, such as the queue being overflowed, the message format is incorrect, etc.
  • Prevent data loss: Putting messages that cannot be processed due to timeout, queue overflow, consumption failure and other reasons into the dead letter queue can ensure that they will not be lost and further investigation and processing can be carried out in the future.

This article combines the delay queue and the dead letter queue. The messages of the delay queue set the expiration time. After the time expires, the message content is pushed to the dead letter queue for processing.

Integration logic

rely

 <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.27</version>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.7</version>
        </dependency>

Configuration

server:
  port: 8081
spring:
  application:
    name: walker-rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest_walker
    password: guest

Create a switch | Queue

Delay queue

package ;
import ;
import ;
import .*;
import ;
import ;
import ;
import ;
@Configuration
public class OrderDelayRabbitConfig {
// Switch    @Bean
    public DirectExchange orderDelayExchange(){
        return new DirectExchange(RabbitMqConfigEnum.ORDER_DELAY.getExchange());
    }
// Queue    @Bean
    public Queue orderDelayQueue(){
        Map&lt;String,Object&gt; args = new HashMap&lt;&gt;();
// Delay queue, you need to bind the dead letter queue. After ttl expires, transfer the queue to the dead letter queue.        (,RabbitMqConfigEnum.ORDER_DEAD.getExchange());
        (,RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey());
        return (RabbitMqConfigEnum.ORDER_DELAY.getQueue()).autoDelete().withArguments(args).build();
    }
//
    @Bean
    public Binding orderDelayQueueBinding(){
        return (orderDelayQueue()).to(orderDelayExchange()).with(RabbitMqConfigEnum.ORDER_DELAY.getRoutingKey());
    }
}

Dead letter queue

package ;
import ;
import .*;
import ;
import ;
@Configuration
public class OrderDeadRabbitConfig {
// Switch    @Bean
    public DirectExchange orderDeadExchange(){
        return new DirectExchange(RabbitMqConfigEnum.ORDER_DEAD.getExchange());
    }
// Queue    @Bean
    public Queue orderDeadQueue(){
        return (RabbitMqConfigEnum.ORDER_DEAD.getQueue()).autoDelete().build();
    }
// Switch, queue, routing key binding    @Bean
    public Binding orderDeadQueueBinding(){
        return (orderDeadQueue()).to(orderDeadExchange())
                .with(RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey());
    }
}

constant

package ;
import ;
public interface BaseConstant {
    String xDeadLetterExchange = "x-dead-letter-exchange";
    String xDeadLetterRoutingKey = "x-dead-letter-routing-key";
}

Related enumeration

package ;
import ;
import ;
@AllArgsConstructor
@Getter
public enum RabbitMqConfigEnum {
    ORDER_DELAY("direct","Order Delay Queue","order_delay_exchange", "order_delay_queue", "order_delay_routing_key",true),
    ORDER_DEAD("direct","Order Dead Letter Queue","order_dead_exchange", "order_dead_queue", "order_dead_routing_key",true);
    private final String type;
    private final String title;
    private final String exchange;
    private final String queue;
    private final String routingKey;
    private final Boolean durable;
}

rabbitmq tool class encapsulation

package ;
import ;
import ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Component
@Slf4j
public class RabbitmqService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * author:walker
     * time: 2024/12/30
     * description: Manual processing method without return result
     */
    public void doWithManualAck(String event, Message message, Channel channel, Consumer&lt;String&gt; method) {
        String data = new String(());
        try {
            (data);
            sendAck(message, channel);
        } catch (Exception e) {
            ("Handle events[{}]The exception data is:{},The reasons are as follows:",event,data, e);
            sendNack(message, channel);
        }
    }
    /**
      * author:walker
      * time: 2024/12/30
      * description: Confirm and reject according to the returned boolean type
      */
    public void doWithManualAck(String event, Message message, Channel channel, Function&lt;String,Boolean&gt; method) {
        String data = new String(());
        try {
            Boolean res = (data);
            if((res)){
                sendAck(message, channel);
            }else{
                sendNack(message, channel);
            }
        } catch (Exception e) {
            ("Handle events[{}]The exception data is:{},The reasons are as follows:",event,data, e);
            sendNack(message, channel);
        }
    }
    // Confirm message    public void sendAck(Message message, Channel channel) throws IOException {
        (().getDeliveryTag(), false);
    }
    // Reject message    public void sendNack(Message message, Channel channel) {
        try {
            (().getDeliveryTag(), false, true);
        } catch (IOException e) {
            ("News consumption failed,informationID:{}", ().getMessageId(), e);
        }
    }
    public &lt;T&gt; void sendDelayMsg(RabbitMqConfigEnum configEnum, T data, Integer delayTime) {
        ((), (),
                (data), new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        ().setExpiration((delayTime));
                        return message;
                    }
                });
    }
}

Case

Entity Class

package ;
import ;
@Data
public class OrderInfo {
    private String productName;
    private Integer num;
    private String userName;
    private String orderTime;
}

controller Create an order

package ;
import ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
    @Resource
    private RabbitmqService rabbitmqService;
    /**
     * author:walker
     * time: 2025/1/2
     * description: Order
     */
    @GetMapping("/create")
    public void createOrder() {
        OrderInfo orderInfo = new OrderInfo();
        ("badminton");
        ("Zhang San");
        ("2025-01-02 12:00:00");
        (1);
        ("Send message time{}",new Date());
        (RabbitMqConfigEnum.ORDER_DELAY,orderInfo,10000);
    }
}

Consumers listen to messages

Consumer interface

package ;
public interface IRabbitCosumerHandler {
    Boolean handle(String message);
}

Consumer monitoring method

package ;
import ;
import ;
import ;
import ;
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
import ;
@Slf4j
@Component
public class OrderDeadLetterQueueConsumer implements IRabbitCosumerHandler {
    @Resource
    private RabbitmqService rabbitmqService;
    @RabbitListener(queues = "#{T().ORDER_DEAD.getQueue()}",ackMode = "MANUAL")
    public void receive(Message message, Channel channel) throws IOException {
        ("Time to accept message {}",new Date());
        String msg = new String(());
        ("Current time:{},queue{}Received a message:{}", new Date().toString(),RabbitMqConfigEnum.ORDER_DEAD.getQueue(), msg);
        (RabbitMqConfigEnum.ORDER_DEAD.getTitle(), message, channel, this::handle);
    }
    @Override
    public Boolean handle(String message) {
// Processing logic        ("Perform an order cancellation operation");
        return true;
    }
}

This is the article about Springboot using Rabbitmq's delay queue + dead letter queue to achieve message delay consumption. This is the end of this article. For more related Springboot using Rabbitmq's message delay consumption. Please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!