Introduction
RabbitMQ's delay queue(Delayed Queue) means that after the message is sent to the queue, it will be delayed for a certain period of time. The message will not be consumed by the consumer until the preset delay time is over. RabbitMQ itself does not support delay queues natively, but we can implement this functionality through some plugins or specific configurations.
Application scenarios of delay queues
- Timing tasks: For example, regular email sending, push notifications and other operations.
- Retry mechanism: When some tasks fail, you can have the task try again after a period of time.
- Delayed consumption: Consume messages after a specific time, rather than immediately.
Dead letter queue(Dead Letter Queue, DLQ) is a special queue in the message queue, which is used to store messages that cannot be consumed normally for some reason. Dead letter queues are used in many message queue systems (such as RabbitMQ, Kafka, etc.). By transferring unconsumable messages to a dead letter queue, developers can help discover and handle exceptions or errors in the message consumption process.
The main uses of the dead letter queue are:
- Error handling: Used to capture unconsumable messages, avoid loss of these messages, and provide opportunities for subsequent manual intervention or automatic retry.
- Monitoring and diagnosis: By viewing the dead letter queue, it can help developers quickly discover problems, such as the queue being overflowed, the message format is incorrect, etc.
- Prevent data loss: Putting messages that cannot be processed due to timeout, queue overflow, consumption failure and other reasons into the dead letter queue can ensure that they will not be lost and further investigation and processing can be carried out in the future.
This article combines the delay queue and the dead letter queue. The messages of the delay queue set the expiration time. After the time expires, the message content is pushed to the dead letter queue for processing.
Integration logic
rely
<dependency> <groupId></groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId></groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> <dependency> <groupId></groupId> <artifactId>hutool-all</artifactId> <version>5.8.27</version> </dependency> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.7.7</version> </dependency>
Configuration
server: port: 8081 spring: application: name: walker-rabbitmq rabbitmq: host: 127.0.0.1 port: 5672 username: guest_walker password: guest
Create a switch | Queue
Delay queue
package ; import ; import ; import .*; import ; import ; import ; import ; @Configuration public class OrderDelayRabbitConfig { // Switch @Bean public DirectExchange orderDelayExchange(){ return new DirectExchange(RabbitMqConfigEnum.ORDER_DELAY.getExchange()); } // Queue @Bean public Queue orderDelayQueue(){ Map<String,Object> args = new HashMap<>(); // Delay queue, you need to bind the dead letter queue. After ttl expires, transfer the queue to the dead letter queue. (,RabbitMqConfigEnum.ORDER_DEAD.getExchange()); (,RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey()); return (RabbitMqConfigEnum.ORDER_DELAY.getQueue()).autoDelete().withArguments(args).build(); } // @Bean public Binding orderDelayQueueBinding(){ return (orderDelayQueue()).to(orderDelayExchange()).with(RabbitMqConfigEnum.ORDER_DELAY.getRoutingKey()); } }
Dead letter queue
package ; import ; import .*; import ; import ; @Configuration public class OrderDeadRabbitConfig { // Switch @Bean public DirectExchange orderDeadExchange(){ return new DirectExchange(RabbitMqConfigEnum.ORDER_DEAD.getExchange()); } // Queue @Bean public Queue orderDeadQueue(){ return (RabbitMqConfigEnum.ORDER_DEAD.getQueue()).autoDelete().build(); } // Switch, queue, routing key binding @Bean public Binding orderDeadQueueBinding(){ return (orderDeadQueue()).to(orderDeadExchange()) .with(RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey()); } }
constant
package ; import ; public interface BaseConstant { String xDeadLetterExchange = "x-dead-letter-exchange"; String xDeadLetterRoutingKey = "x-dead-letter-routing-key"; }
Related enumeration
package ; import ; import ; @AllArgsConstructor @Getter public enum RabbitMqConfigEnum { ORDER_DELAY("direct","Order Delay Queue","order_delay_exchange", "order_delay_queue", "order_delay_routing_key",true), ORDER_DEAD("direct","Order Dead Letter Queue","order_dead_exchange", "order_dead_queue", "order_dead_routing_key",true); private final String type; private final String title; private final String exchange; private final String queue; private final String routingKey; private final Boolean durable; }
rabbitmq tool class encapsulation
package ; import ; import ; import ; import ; import .slf4j.Slf4j; import ; import ; import ; import ; import ; import ; import ; import ; import ; @Component @Slf4j public class RabbitmqService { @Resource private RabbitTemplate rabbitTemplate; /** * author:walker * time: 2024/12/30 * description: Manual processing method without return result */ public void doWithManualAck(String event, Message message, Channel channel, Consumer<String> method) { String data = new String(()); try { (data); sendAck(message, channel); } catch (Exception e) { ("Handle events[{}]The exception data is:{},The reasons are as follows:",event,data, e); sendNack(message, channel); } } /** * author:walker * time: 2024/12/30 * description: Confirm and reject according to the returned boolean type */ public void doWithManualAck(String event, Message message, Channel channel, Function<String,Boolean> method) { String data = new String(()); try { Boolean res = (data); if((res)){ sendAck(message, channel); }else{ sendNack(message, channel); } } catch (Exception e) { ("Handle events[{}]The exception data is:{},The reasons are as follows:",event,data, e); sendNack(message, channel); } } // Confirm message public void sendAck(Message message, Channel channel) throws IOException { (().getDeliveryTag(), false); } // Reject message public void sendNack(Message message, Channel channel) { try { (().getDeliveryTag(), false, true); } catch (IOException e) { ("News consumption failed,informationID:{}", ().getMessageId(), e); } } public <T> void sendDelayMsg(RabbitMqConfigEnum configEnum, T data, Integer delayTime) { ((), (), (data), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { ().setExpiration((delayTime)); return message; } }); } }
Case
Entity Class
package ; import ; @Data public class OrderInfo { private String productName; private Integer num; private String userName; private String orderTime; }
controller Create an order
package ; import ; import ; import ; import .slf4j.Slf4j; import ; import ; import ; import ; import ; @Slf4j @RestController @RequestMapping("/order") public class OrderController { @Resource private RabbitmqService rabbitmqService; /** * author:walker * time: 2025/1/2 * description: Order */ @GetMapping("/create") public void createOrder() { OrderInfo orderInfo = new OrderInfo(); ("badminton"); ("Zhang San"); ("2025-01-02 12:00:00"); (1); ("Send message time{}",new Date()); (RabbitMqConfigEnum.ORDER_DELAY,orderInfo,10000); } }
Consumers listen to messages
Consumer interface
package ; public interface IRabbitCosumerHandler { Boolean handle(String message); }
Consumer monitoring method
package ; import ; import ; import ; import ; import ; import .slf4j.Slf4j; import ; import ; import ; import ; import ; import ; @Slf4j @Component public class OrderDeadLetterQueueConsumer implements IRabbitCosumerHandler { @Resource private RabbitmqService rabbitmqService; @RabbitListener(queues = "#{T().ORDER_DEAD.getQueue()}",ackMode = "MANUAL") public void receive(Message message, Channel channel) throws IOException { ("Time to accept message {}",new Date()); String msg = new String(()); ("Current time:{},queue{}Received a message:{}", new Date().toString(),RabbitMqConfigEnum.ORDER_DEAD.getQueue(), msg); (RabbitMqConfigEnum.ORDER_DEAD.getTitle(), message, channel, this::handle); } @Override public Boolean handle(String message) { // Processing logic ("Perform an order cancellation operation"); return true; } }
This is the article about Springboot using Rabbitmq's delay queue + dead letter queue to achieve message delay consumption. This is the end of this article. For more related Springboot using Rabbitmq's message delay consumption. Please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!