SoFunction
Updated on 2025-04-20

Summary of four delay queue implementation methods in Redis

A delay queue is a special message queue that allows messages to be consumed after a specified time. In microservice architecture, e-commerce systems and task scheduling scenarios, delay queues play a key role. For example, automatic cancellation of order timeout, regular reminder, delayed payment, etc. all rely on delay queues to implement.

As a high-performance in-memory database, Redis has the characteristics of atomic operations, rich data structures and simple and easy to use. This article will introduce four ways to implement distributed delay queues based on Redis.

1. Delay queue based on Sorted Set

principle

Using Redis's Sorted Set (ordered set), the message ID is used as member and the execution time stamp is stored as score. passZRANGEBYSCOREThe command can obtain tasks that reach execution time.

Code implementation

public class RedisZSetDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String queueKey = "delay_queue:tasks";
    
    public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {
         = redisTemplate;
    }
    
    /**
      * Add delay task
      * @param taskId Task ID
      * @param taskInfo task information (JSON string)
      * @param delayTime Delay Time (seconds)
      */
    public void addTask(String taskId, String taskInfo, long delayTime) {
        // Calculate execution time        long executeTime = () + delayTime * 1000;
        
        // Store task details        ().put("delay_queue:details", taskId, taskInfo);
        
        // Add to delay queue        ().add(queueKey, taskId, executeTime);
        
        ("Task added: " + taskId + ", will execute at: " + executeTime);
    }
    
    /**
      * Poll to get expired tasks
      */
    public List<String> pollTasks() {
        long now = ();
        
        // Get the task before the current time        Set<String> taskIds = ()
                .rangeByScore(queueKey, 0, now);
        
        if (taskIds == null || ()) {
            return ();
        }
        
        // Get task details        List<String> tasks = new ArrayList<>();
        for (String taskId : taskIds) {
            String taskInfo = (String) ()
                    .get("delay_queue:details", taskId);
            
            if (taskInfo != null) {
                (taskInfo);
                
                // Remove task from collection and details                ().remove(queueKey, taskId);
                ().delete("delay_queue:details", taskId);
            }
        }
        
        return tasks;
    }
    
    // Timing task example    public void startTaskProcessor() {
        ScheduledExecutorService scheduler = (1);
        (() -> {
            try {
                List<String> tasks = pollTasks();
                for (String task : tasks) {
                    processTask(task);
                }
            } catch (Exception e) {
                ();
            }
        }, 0, 1, );
    }
    
    private void processTask(String taskInfo) {
        ("Processing task: " + taskInfo);
        // Actual task processing logic    }
}

Pros and cons

advantage

  • Simple to implement and easy to understand
  • Tasks are automatically sorted by execution time
  • Supports accurate time control

shortcoming

  • Need to poll to obtain expired tasks, consume CPU resources
  • In a large number of tasks,ZRANGEBYSCOREOperations may affect performance
  • There is no consumption confirmation mechanism, and additional implementation is required

2. Delay queue based on List + timed polling

principle

This method uses multiple Lists as storage containers, and assigns tasks to different queues according to different delay times. Move the expired task to an immediate execution queue by timed polling each queue.

Code implementation

public class RedisListDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String readyQueueKey = "delay_queue:ready";  // Pending queue    private final Map<Integer, String> delayQueueKeys;  // Delay queue, grading by delay time    
    public RedisListDelayQueue(StringRedisTemplate redisTemplate) {
         = redisTemplate;
        
        // Initialize queues with different delay levels        delayQueueKeys = new HashMap<>();
        (5, "delay_queue:delay_5s");     // 5 seconds        (60, "delay_queue:delay_1m");    // 1 minute        (300, "delay_queue:delay_5m");   // 5 minutes        (1800, "delay_queue:delay_30m"); // 30 minutes    }
    
    /**
      * Add delay task
      */
    public void addTask(String taskInfo, int delaySeconds) {
        // Select the appropriate delay queue        String queueKey = selectDelayQueue(delaySeconds);
        
        // Task metadata, including task information and execution time        long executeTime = () + delaySeconds * 1000;
        String taskData = executeTime + ":" + taskInfo;
        
        // Add to delay queue        ().rightPush(queueKey, taskData);
        ("Task added to " + queueKey + ": " + taskData);
    }
    
    /**
      * Select the right delay queue
      */
    private String selectDelayQueue(int delaySeconds) {
        // Find the closest delay level        int closestDelay = ().stream()
                .filter(delay -> delay >= delaySeconds)
                .min(Integer::compareTo)
                .orElse((()));
                
        return (closestDelay);
    }
    
    /**
      * Move expired tasks to pending queue
      */
    public void moveTasksToReadyQueue() {
        long now = ();
        
        // traverse all delay queues        for (String queueKey : ()) {
            boolean hasMoreTasks = true;
            
            while (hasMoreTasks) {
                // View queue head tasks                String taskData = ().index(queueKey, 0);
                if (taskData == null) {
                    hasMoreTasks = false;
                    continue;
                }
                
                // parse the task execution time                long executeTime = ((":", 2)[0]);
                
                // Check whether it expires                if (executeTime <= now) {
                    // Atomically remove queue head tasks through LPOP                    String task = ().leftPop(queueKey);
                    
                    // The task may be processed by other processes, check again                    if (task != null) {
                        // Extract task information and add it to the pending queue                        String taskInfo = (":", 2)[1];
                        ().rightPush(readyQueueKey, taskInfo);
                        ("Task moved to ready queue: " + taskInfo);
                    }
                } else {
                    // The queue head task has not expired, so there is no need to check the following tasks                    hasMoreTasks = false;
                }
            }
        }
    }
    
    /**
      * Get pending tasks
      */
    public String getReadyTask() {
        return ().leftPop(readyQueueKey);
    }
    
    /**
      * Start the task processor
      */
    public void startTaskProcessors() {
        // Timed expiration task        ScheduledExecutorService scheduler = (2);
        
        // Move task thread        (this::moveTasksToReadyQueue, 0, 1, );
        
        // Handle task threads        (() -> {
            String task = getReadyTask();
            if (task != null) {
                processTask(task);
            }
        }, 0, 100, );
    }
    
    private void processTask(String taskInfo) {
        ("Processing task: " + taskInfo);
        // Actual task processing logic    }
}

Pros and cons

advantage

  • Graded queue design to reduce single queue pressure
  • Compared to Sorted Set, it takes up less memory
  • Support queue monitoring and task priority

shortcoming

  • Delay time accuracy is affected by polling frequency
  • High complexity
  • Multiple queues need to be maintained
  • Time judgment and queue operation are non-atomic, and concurrency problems need to be dealt with specifically

3. Delay queue based on publish/subscription (Pub/Sub)

principle

Combining Redis publish/subscribe function and local time round algorithm, the distribution and processing of delayed tasks are realized. Task information is stored in Redis, and the time round is responsible for scheduling and publishing of tasks.

Code implementation

public class RedisPubSubDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String TASK_TOPIC = "delay_queue:task_channel";
    private final String TASK_HASH = "delay_queue:tasks";
    
    private final HashedWheelTimer timer;
    
    public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {
         = redisTemplate;
        
        // Initialize the time wheel, scale 100ms, wheel size 512         = new HashedWheelTimer(100, , 512);
        
        // Start message subscription        subscribeTaskChannel();
    }
    
    /**
      * Add delay task
      */
    public void addTask(String taskId, String taskInfo, long delaySeconds) {
        //Storing task information to Redis        ().put(TASK_HASH, taskId, taskInfo);
        
        // Add to time round        (timeout -> {
            // Post a task ready message            (TASK_TOPIC, taskId);
        }, delaySeconds, );
        
        ("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");
    }
    
    /**
      * Subscribe to task channel
      */
    private void subscribeTaskChannel() {
        ().getConnection().subscribe(
            (message, pattern) -> {
                String taskId = new String(());
                
                // Get task information                String taskInfo = (String) ().get(TASK_HASH, taskId);
                
                if (taskInfo != null) {
                    // Handle tasks                    processTask(taskId, taskInfo);
                    
                    // Delete the task                    ().delete(TASK_HASH, taskId);
                }
            }, 
            TASK_TOPIC.getBytes()
        );
    }
    
    private void processTask(String taskId, String taskInfo) {
        ("Processing task: " + taskId + " - " + taskInfo);
        // Actual task processing logic    }
    
    // Simulate the HashedWheelTimer class    public static class HashedWheelTimer {
        private final ScheduledExecutorService scheduler = (1);
        private final long tickDuration;
        private final TimeUnit unit;
        
        public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {
             = tickDuration;
             = unit;
        }
        
        public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {
            long delayMillis = (delay);
            (
                () -> (null), 
                delayMillis, 
                
            );
        }
        
        public interface TimerTask {
            void run(Timeout timeout);
        }
        
        public interface Timeout {
        }
    }
}

Pros and cons

advantage

  • Instant triggering, no polling required
  • Efficient time-wheel algorithm
  • Can subscribe to tasks across applications
  • Separate task scheduling and execution to reduce coupling

shortcoming

  • Relying on local time round, non-pure Redis implementation
  • Pub/Sub mode has no message persistence, and messages may be lost
  • Rebuilding time round is required when the service restarts
  • Subscribers need to stay connected

4. Delay queue based on Redis Stream

principle

Redis 5.0 introduced Stream is a powerful data structure designed for message queues. Combining Stream's consumption group and confirmation mechanism, a reliable delay queue can be built.

Code implementation

public class RedisStreamDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String delayQueueKey = "delay_queue:stream";
    private final String consumerGroup = "delay_queue_consumers";
    private final String consumerId = ().toString();
    
    public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {
         = redisTemplate;
        
        // Create a consumer group        try {
            ((RedisCallback<String>) connection -> {
                ().xGroupCreate(
                    (), 
                    consumerGroup, 
                    ("0"), 
                    true
                );
                return "OK";
            });
        } catch (Exception e) {
            // The consumer group may already exist            ("Consumer group may already exist: " + ());
        }
    }
    
    /**
      * Add delay task
      */
    public void addTask(String taskInfo, long delaySeconds) {
        long executeTime = () + delaySeconds * 1000;
        
        Map<String, Object> task = new HashMap<>();
        ("executeTime", (executeTime));
        ("taskInfo", taskInfo);
        
        ().add(delayQueueKey, task);
        ("Task added: " + taskInfo + ", execute at: " + executeTime);
    }
    
    /**
      * Get the task to be executed
      */
    public List<String> pollTasks() {
        long now = ();
        List<String> readyTasks = new ArrayList<>();
        
        // Read unprocessed messages        List<MapRecord<String, Object, Object>> records = (
            (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {
                return ().xReadGroup(
                    (),
                    (),
                    ().count(10),
                    ((), (">"))
                );
            }
        );
        
        if (records != null) {
            for (MapRecord<String, Object, Object> record : records) {
                String messageId = ().getValue();
                Map<Object, Object> value = ();
                
                long executeTime = ((String) ("executeTime"));
                String taskInfo = (String) ("taskInfo");
                
                // Check whether the task expires                if (executeTime <= now) {
                    (taskInfo);
                    
                    // Confirm that the message has been processed                    ((RedisCallback<String>) connection -> {
                        ().xAck(
                            (),
                            (),
                            ()
                        );
                        return "OK";
                    });
                    
                    // Optional: Delete messages from stream                    ().delete(delayQueueKey, messageId);
                } else {
                    // The task has not expired, put back to the queue                    ((RedisCallback<String>) connection -> {
                        ().xAck(
                            (),
                            (),
                            ()
                        );
                        return "OK";
                    });
                    
                    // Re-add task (optional: use the delayed re-enqueue strategy)                    Map<String, Object> newTask = new HashMap<>();
                    ("executeTime", (executeTime));
                    ("taskInfo", taskInfo);
                    ().add(delayQueueKey, newTask);
                }
            }
        }
        
        return readyTasks;
    }
    
    /**
      * Start the task processor
      */
    public void startTaskProcessor() {
        ScheduledExecutorService scheduler = (1);
        (() -> {
            try {
                List<String> tasks = pollTasks();
                for (String task : tasks) {
                    processTask(task);
                }
            } catch (Exception e) {
                ();
            }
        }, 0, 1, );
    }
    
    private void processTask(String taskInfo) {
        ("Processing task: " + taskInfo);
        // Actual task processing logic    }
}

Pros and cons

advantage

  • Support consumer groups and message acknowledgement, providing reliable message processing
  • Built-in message persistence mechanism
  • Supports multi-consumer parallel processing
  • Message ID contains timestamps for easy sorting

shortcoming

  • Redis version 5.0+ required
  • Relatively complex implementation
  • Still need to poll to get expired tasks
  • The handling of unexpired tasks is relatively cumbersome

Performance comparison and selection suggestions

Implementation method performance reliability Implement complexity Memory usage Applicable scenarios
Sorted Set ★★★★☆ ★★★☆☆ Low middle Moderate task volume, precise scheduling is required
List + polling ★★★★★ ★★★☆☆ middle Low High concurrency, low delay accuracy requirements
Pub/Sub + Time Wheel ★★★★★ ★★☆☆☆ high Low High real-time requirements, tolerating service restart loss
Stream ★★★☆☆ ★★★★★ high middle High reliability requirements, message confirmation is required

Summarize

In practical applications, you can choose based on factors such as system scale, performance requirements, reliability requirements and implementation complexity, or you can combine multiple ways to create a delay queue solution that is more in line with business needs. No matter which implementation you choose, you should focus on reliability, performance and monitoring aspects to ensure that the delay queue runs stably in the production environment.

This is the article about the four delay queue implementation methods in Redis. For more related Redis delay queue content, please search for my previous article or continue browsing the related articles below. I hope everyone will support me in the future!