A delay queue is a special message queue that allows messages to be consumed after a specified time. In microservice architecture, e-commerce systems and task scheduling scenarios, delay queues play a key role. For example, automatic cancellation of order timeout, regular reminder, delayed payment, etc. all rely on delay queues to implement.
As a high-performance in-memory database, Redis has the characteristics of atomic operations, rich data structures and simple and easy to use. This article will introduce four ways to implement distributed delay queues based on Redis.
1. Delay queue based on Sorted Set
principle
Using Redis's Sorted Set (ordered set), the message ID is used as member and the execution time stamp is stored as score. passZRANGEBYSCORE
The command can obtain tasks that reach execution time.
Code implementation
public class RedisZSetDelayQueue { private final StringRedisTemplate redisTemplate; private final String queueKey = "delay_queue:tasks"; public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) { = redisTemplate; } /** * Add delay task * @param taskId Task ID * @param taskInfo task information (JSON string) * @param delayTime Delay Time (seconds) */ public void addTask(String taskId, String taskInfo, long delayTime) { // Calculate execution time long executeTime = () + delayTime * 1000; // Store task details ().put("delay_queue:details", taskId, taskInfo); // Add to delay queue ().add(queueKey, taskId, executeTime); ("Task added: " + taskId + ", will execute at: " + executeTime); } /** * Poll to get expired tasks */ public List<String> pollTasks() { long now = (); // Get the task before the current time Set<String> taskIds = () .rangeByScore(queueKey, 0, now); if (taskIds == null || ()) { return (); } // Get task details List<String> tasks = new ArrayList<>(); for (String taskId : taskIds) { String taskInfo = (String) () .get("delay_queue:details", taskId); if (taskInfo != null) { (taskInfo); // Remove task from collection and details ().remove(queueKey, taskId); ().delete("delay_queue:details", taskId); } } return tasks; } // Timing task example public void startTaskProcessor() { ScheduledExecutorService scheduler = (1); (() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { (); } }, 0, 1, ); } private void processTask(String taskInfo) { ("Processing task: " + taskInfo); // Actual task processing logic } }
Pros and cons
advantage
- Simple to implement and easy to understand
- Tasks are automatically sorted by execution time
- Supports accurate time control
shortcoming
- Need to poll to obtain expired tasks, consume CPU resources
- In a large number of tasks,
ZRANGEBYSCORE
Operations may affect performance - There is no consumption confirmation mechanism, and additional implementation is required
2. Delay queue based on List + timed polling
principle
This method uses multiple Lists as storage containers, and assigns tasks to different queues according to different delay times. Move the expired task to an immediate execution queue by timed polling each queue.
Code implementation
public class RedisListDelayQueue { private final StringRedisTemplate redisTemplate; private final String readyQueueKey = "delay_queue:ready"; // Pending queue private final Map<Integer, String> delayQueueKeys; // Delay queue, grading by delay time public RedisListDelayQueue(StringRedisTemplate redisTemplate) { = redisTemplate; // Initialize queues with different delay levels delayQueueKeys = new HashMap<>(); (5, "delay_queue:delay_5s"); // 5 seconds (60, "delay_queue:delay_1m"); // 1 minute (300, "delay_queue:delay_5m"); // 5 minutes (1800, "delay_queue:delay_30m"); // 30 minutes } /** * Add delay task */ public void addTask(String taskInfo, int delaySeconds) { // Select the appropriate delay queue String queueKey = selectDelayQueue(delaySeconds); // Task metadata, including task information and execution time long executeTime = () + delaySeconds * 1000; String taskData = executeTime + ":" + taskInfo; // Add to delay queue ().rightPush(queueKey, taskData); ("Task added to " + queueKey + ": " + taskData); } /** * Select the right delay queue */ private String selectDelayQueue(int delaySeconds) { // Find the closest delay level int closestDelay = ().stream() .filter(delay -> delay >= delaySeconds) .min(Integer::compareTo) .orElse((())); return (closestDelay); } /** * Move expired tasks to pending queue */ public void moveTasksToReadyQueue() { long now = (); // traverse all delay queues for (String queueKey : ()) { boolean hasMoreTasks = true; while (hasMoreTasks) { // View queue head tasks String taskData = ().index(queueKey, 0); if (taskData == null) { hasMoreTasks = false; continue; } // parse the task execution time long executeTime = ((":", 2)[0]); // Check whether it expires if (executeTime <= now) { // Atomically remove queue head tasks through LPOP String task = ().leftPop(queueKey); // The task may be processed by other processes, check again if (task != null) { // Extract task information and add it to the pending queue String taskInfo = (":", 2)[1]; ().rightPush(readyQueueKey, taskInfo); ("Task moved to ready queue: " + taskInfo); } } else { // The queue head task has not expired, so there is no need to check the following tasks hasMoreTasks = false; } } } } /** * Get pending tasks */ public String getReadyTask() { return ().leftPop(readyQueueKey); } /** * Start the task processor */ public void startTaskProcessors() { // Timed expiration task ScheduledExecutorService scheduler = (2); // Move task thread (this::moveTasksToReadyQueue, 0, 1, ); // Handle task threads (() -> { String task = getReadyTask(); if (task != null) { processTask(task); } }, 0, 100, ); } private void processTask(String taskInfo) { ("Processing task: " + taskInfo); // Actual task processing logic } }
Pros and cons
advantage
- Graded queue design to reduce single queue pressure
- Compared to Sorted Set, it takes up less memory
- Support queue monitoring and task priority
shortcoming
- Delay time accuracy is affected by polling frequency
- High complexity
- Multiple queues need to be maintained
- Time judgment and queue operation are non-atomic, and concurrency problems need to be dealt with specifically
3. Delay queue based on publish/subscription (Pub/Sub)
principle
Combining Redis publish/subscribe function and local time round algorithm, the distribution and processing of delayed tasks are realized. Task information is stored in Redis, and the time round is responsible for scheduling and publishing of tasks.
Code implementation
public class RedisPubSubDelayQueue { private final StringRedisTemplate redisTemplate; private final String TASK_TOPIC = "delay_queue:task_channel"; private final String TASK_HASH = "delay_queue:tasks"; private final HashedWheelTimer timer; public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) { = redisTemplate; // Initialize the time wheel, scale 100ms, wheel size 512 = new HashedWheelTimer(100, , 512); // Start message subscription subscribeTaskChannel(); } /** * Add delay task */ public void addTask(String taskId, String taskInfo, long delaySeconds) { //Storing task information to Redis ().put(TASK_HASH, taskId, taskInfo); // Add to time round (timeout -> { // Post a task ready message (TASK_TOPIC, taskId); }, delaySeconds, ); ("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s"); } /** * Subscribe to task channel */ private void subscribeTaskChannel() { ().getConnection().subscribe( (message, pattern) -> { String taskId = new String(()); // Get task information String taskInfo = (String) ().get(TASK_HASH, taskId); if (taskInfo != null) { // Handle tasks processTask(taskId, taskInfo); // Delete the task ().delete(TASK_HASH, taskId); } }, TASK_TOPIC.getBytes() ); } private void processTask(String taskId, String taskInfo) { ("Processing task: " + taskId + " - " + taskInfo); // Actual task processing logic } // Simulate the HashedWheelTimer class public static class HashedWheelTimer { private final ScheduledExecutorService scheduler = (1); private final long tickDuration; private final TimeUnit unit; public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) { = tickDuration; = unit; } public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) { long delayMillis = (delay); ( () -> (null), delayMillis, ); } public interface TimerTask { void run(Timeout timeout); } public interface Timeout { } } }
Pros and cons
advantage:
- Instant triggering, no polling required
- Efficient time-wheel algorithm
- Can subscribe to tasks across applications
- Separate task scheduling and execution to reduce coupling
shortcoming:
- Relying on local time round, non-pure Redis implementation
- Pub/Sub mode has no message persistence, and messages may be lost
- Rebuilding time round is required when the service restarts
- Subscribers need to stay connected
4. Delay queue based on Redis Stream
principle
Redis 5.0 introduced Stream is a powerful data structure designed for message queues. Combining Stream's consumption group and confirmation mechanism, a reliable delay queue can be built.
Code implementation
public class RedisStreamDelayQueue { private final StringRedisTemplate redisTemplate; private final String delayQueueKey = "delay_queue:stream"; private final String consumerGroup = "delay_queue_consumers"; private final String consumerId = ().toString(); public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) { = redisTemplate; // Create a consumer group try { ((RedisCallback<String>) connection -> { ().xGroupCreate( (), consumerGroup, ("0"), true ); return "OK"; }); } catch (Exception e) { // The consumer group may already exist ("Consumer group may already exist: " + ()); } } /** * Add delay task */ public void addTask(String taskInfo, long delaySeconds) { long executeTime = () + delaySeconds * 1000; Map<String, Object> task = new HashMap<>(); ("executeTime", (executeTime)); ("taskInfo", taskInfo); ().add(delayQueueKey, task); ("Task added: " + taskInfo + ", execute at: " + executeTime); } /** * Get the task to be executed */ public List<String> pollTasks() { long now = (); List<String> readyTasks = new ArrayList<>(); // Read unprocessed messages List<MapRecord<String, Object, Object>> records = ( (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> { return ().xReadGroup( (), (), ().count(10), ((), (">")) ); } ); if (records != null) { for (MapRecord<String, Object, Object> record : records) { String messageId = ().getValue(); Map<Object, Object> value = (); long executeTime = ((String) ("executeTime")); String taskInfo = (String) ("taskInfo"); // Check whether the task expires if (executeTime <= now) { (taskInfo); // Confirm that the message has been processed ((RedisCallback<String>) connection -> { ().xAck( (), (), () ); return "OK"; }); // Optional: Delete messages from stream ().delete(delayQueueKey, messageId); } else { // The task has not expired, put back to the queue ((RedisCallback<String>) connection -> { ().xAck( (), (), () ); return "OK"; }); // Re-add task (optional: use the delayed re-enqueue strategy) Map<String, Object> newTask = new HashMap<>(); ("executeTime", (executeTime)); ("taskInfo", taskInfo); ().add(delayQueueKey, newTask); } } } return readyTasks; } /** * Start the task processor */ public void startTaskProcessor() { ScheduledExecutorService scheduler = (1); (() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { (); } }, 0, 1, ); } private void processTask(String taskInfo) { ("Processing task: " + taskInfo); // Actual task processing logic } }
Pros and cons
advantage:
- Support consumer groups and message acknowledgement, providing reliable message processing
- Built-in message persistence mechanism
- Supports multi-consumer parallel processing
- Message ID contains timestamps for easy sorting
shortcoming:
- Redis version 5.0+ required
- Relatively complex implementation
- Still need to poll to get expired tasks
- The handling of unexpired tasks is relatively cumbersome
Performance comparison and selection suggestions
Implementation method | performance | reliability | Implement complexity | Memory usage | Applicable scenarios |
---|---|---|---|---|---|
Sorted Set | ★★★★☆ | ★★★☆☆ | Low | middle | Moderate task volume, precise scheduling is required |
List + polling | ★★★★★ | ★★★☆☆ | middle | Low | High concurrency, low delay accuracy requirements |
Pub/Sub + Time Wheel | ★★★★★ | ★★☆☆☆ | high | Low | High real-time requirements, tolerating service restart loss |
Stream | ★★★☆☆ | ★★★★★ | high | middle | High reliability requirements, message confirmation is required |
Summarize
In practical applications, you can choose based on factors such as system scale, performance requirements, reliability requirements and implementation complexity, or you can combine multiple ways to create a delay queue solution that is more in line with business needs. No matter which implementation you choose, you should focus on reliability, performance and monitoring aspects to ensure that the delay queue runs stably in the production environment.
This is the article about the four delay queue implementation methods in Redis. For more related Redis delay queue content, please search for my previous article or continue browsing the related articles below. I hope everyone will support me in the future!