SoFunction
Updated on 2025-03-01

Go+Redis implements delay queue operation

Preface

Delay queues are a very useful data structure. We often have scenarios where we need to delay pushing and processing messages, such as delaying sending SMS messages 60 seconds, delaying closing orders for 30 minutes, delaying retry for message consumption failure, etc.

Generally, we need to rely on the underlying ordered structure, such as the heap, and Redis just provideszsetThis data type, its underlying implementation isHash table + jump table, is also an ordered structure, so this article mainly uses Go+Redis to implement delay queues.

Of course, Redis itself does not support delay queues, so we only implement a relatively simple delay queue. Moreover, Redis is not suitable for large-scale message accumulation, so it is only suitable for simpler scenarios. If you need a more powerful and stable message queue, you can use itRocketMQA message queue that comes with delayed messages.

Let’s first determine the goals we want to achieve:

  • Message must be consumed at least once
  • Multiple producers
  • Multiple consumers

Then we define a simple interface:

  • Push(msg) error:Add message to queue
  • Consume(topic, batchSize, func(msg) error):Consumption news

Simple implementation

  • Each topic can be consumed by up to one consumer, because the topic will not be partitioned
  • But multiple producers can produce at the same time, because the Push operation is atomic
  • At the same time, the return value of the consumption operation is error nil before the message is deleted, ensuring that the message is consumed at least once

Define the message

This message refers to Kafka's message structure:

  • Topic can be the name of a queue
  • Key is the unique identifier of the message and cannot be repeated in a queue.
  • Body is the content of the message
  • Delay is the delay time of the message
  • ReadyTime is the time when the message is ready to be executed
// Msg messagetype Msg struct {
   Topic     string        // Theme of the message   Key       string        // The message's key   Body      []byte        // The Body of the message   Delay      // Delay time (seconds)   ReadyTime      // Time for the message to be executed (now + delay)}

Push

Since we need to store the message Body toHash, store the ReadyTime of the message toZSet, so we need a simple Lua script to ensure that these two operations areAtomic

At the same time, we will not overwrite messages with the same key that already exists.

const delayQueuePushRedisScript = `
-- KEYS[1]: topicZSet
-- KEYS[2]: topicHash
-- ARGV[1]: MessageKey
-- ARGV[2]: MessageBody
-- ARGV[3]: Time for message to be executed

local topicZSet = KEYS[1]
local topicHash = KEYS[2]
local key = ARGV[1]
local body = ARGV[2]
local readyTime = tonumber(ARGV[3])

-- Add toreadyTimearrivezset
local count = ("zadd", topicZSet, readyTime, key)
-- The message already exists
if count == 0 then 
   return 0
end
-- Add tobodyarrivehash
("hsetnx", topicHash, key, body)
return 1
`
func (q *SimpleRedisDelayQueue) Push(ctx , msg *Msg) error {
   // If ReadyTime is set, use RedisTime   var readyTime int64
   if !() {
      readyTime = ()
   } else {
      // Otherwise use Delay      readyTime = ().Add().Unix()
   }
   success, err := (ctx, , []string{(), ()},
      , , readyTime).Bool()
   if err != nil {
      return err
   }
   if !success {
      return ErrDuplicateMessage
   }
   return nil
}

Consume

The second parameterbatchSizeIndicates that it is used to batch obtain messages that are ready for execution, reducing network requests.

fnis a function that processes messages, which has a return valueerror,in the case ofnilIt means that the message consumption is successful, and then the delete script is called to delete the message that has been successfully consumed (the contents in ZSet and Hash need to be deleted atomically).

const delayQueueDelRedisScript = `
-- KEYS[1]: topicZSet
-- KEYS[2]: topicHash
-- ARGV[1]: MessageKey

local topicZSet = KEYS[1]
local topicHash = KEYS[2]
local key = ARGV[1]

-- deletezsetandhash关于这条Message内容
("zrem", topicZSet, key)
("hdel", topicHash, key)
return 1
`
func (q *SimpleRedisDelayQueue) Consume(topic string, batchSize int, fn func(msg *Msg) error) {
   for {
      // Bulk fetch messages that are ready for execution      now := ().Unix()
      zs, err := ((), (topic), &{
         Min:   "-inf",
         Max:   (int(now)),
         Count: int64(batchSize),
      }).Result()
      // If a message is not retrieved or the message cannot be retrieved, sleep for one second      if err != nil || len(zs) == 0 {
         ()
         continue
      }
      // traverse each message      for _, z := range zs {
         key := .(string)
         // Get the body of the message         body, err := ((), (topic), key).Bytes()
         if err != nil {
            continue
         }

         // Process messages         err = fn(&Msg{
            Topic:     topic,
            Key:       key,
            Body:      body,
            ReadyTime: (int64(), 0),
         })
         if err != nil {
            continue
         }

         // If the message is processed successfully, delete the message         ((), , []string{(topic), (topic)}, key)
      }
   }
}

Existing problems

If multiple threads call at the same timeConsumeFunctions, then multiple threads will pull the same executable message, causing the message to be consumed repeatedly.

Multi-consumer implementation

  • Each topic can be partitioned by up to several consumers, and the topic will be partitioned.

Define the message

  • We have added a Partition field to represent the partition number of the message
// Msg messagetype Msg struct {
   Topic     string        // Theme of the message   Key       string        // The message's key   Body      []byte        // The Body of the message   Partition int           // Partition number   Delay      // Delay time (seconds)   ReadyTime      // Time for the message to be executed}

Push

The code is the same as Push of SimpleRedisDelayQueue, except that we will use the Partition field in Msg to partition the topic.

func (q *PartitionRedisDelayQueue) topicZSet(topic string, partition int) string {
   return ("%s:%d:z", topic, partition)
}

func (q *PartitionRedisDelayQueue) topicHash(topic string, partition int) string {
   return ("%s:%d:h", topic, partition)
}

Consume

The code is the same as the SimpleRedisDelayQueue Consume, we just added one more to the ConsumepartitionParameters are used to specify the consumed partition.

func (q *PartitionRedisDelayQueue) Consume(topic string, batchSize, partition int, fn func(msg *Msg) error) {
    // ...
}

Existing problems

A big problem is that we need to manually specify partitions instead of automatically allocating partitions. This problem is easier to solve for Push operations. You can use hashing algorithms to partition Keys, for examplemurmur3. But it is more complicated for Consumer because we have to record which partition has been consumed by the consumer. If you really need more complex scenarios, it is recommended to useRocketMQKafkaWait for the message queue to be implemented.

Summarize

  • Using Redis's ZSet can easily implement a high-performance message queue
  • However, the message queue implemented by Redis's ZSet is not suitable for scenarios where large amounts of messages are accumulated. At the same time, it will be more complicated if you need to implement automatic partition consumption function.
  • Suitable for scenarios where the volume of messages is not very large and not very complicated
  • If you need a large number of messages to accumulate and stable multi-consumer functions, you can use the built-in delayed messagesRocketMQ

This is the article about Go+Redis implementing delay queue practical operation. For more related Go delay queue content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!