Preface
Delay queues are a very useful data structure. We often have scenarios where we need to delay pushing and processing messages, such as delaying sending SMS messages 60 seconds, delaying closing orders for 30 minutes, delaying retry for message consumption failure, etc.
Generally, we need to rely on the underlying ordered structure, such as the heap, and Redis just provideszset
This data type, its underlying implementation isHash table + jump table
, is also an ordered structure, so this article mainly uses Go+Redis to implement delay queues.
Of course, Redis itself does not support delay queues, so we only implement a relatively simple delay queue. Moreover, Redis is not suitable for large-scale message accumulation, so it is only suitable for simpler scenarios. If you need a more powerful and stable message queue, you can use itRocketMQ
A message queue that comes with delayed messages.
Let’s first determine the goals we want to achieve:
- Message must be consumed at least once
- Multiple producers
- Multiple consumers
Then we define a simple interface:
- Push(msg) error:Add message to queue
- Consume(topic, batchSize, func(msg) error):Consumption news
Simple implementation
- Each topic can be consumed by up to one consumer, because the topic will not be partitioned
- But multiple producers can produce at the same time, because the Push operation is atomic
- At the same time, the return value of the consumption operation is error nil before the message is deleted, ensuring that the message is consumed at least once
Define the message
This message refers to Kafka's message structure:
- Topic can be the name of a queue
- Key is the unique identifier of the message and cannot be repeated in a queue.
- Body is the content of the message
- Delay is the delay time of the message
- ReadyTime is the time when the message is ready to be executed
// Msg messagetype Msg struct { Topic string // Theme of the message Key string // The message's key Body []byte // The Body of the message Delay // Delay time (seconds) ReadyTime // Time for the message to be executed (now + delay)}
Push
Since we need to store the message Body toHash
, store the ReadyTime of the message toZSet
, so we need a simple Lua script to ensure that these two operations areAtomic
。
At the same time, we will not overwrite messages with the same key that already exists.
const delayQueuePushRedisScript = ` -- KEYS[1]: topicZSet -- KEYS[2]: topicHash -- ARGV[1]: MessageKey -- ARGV[2]: MessageBody -- ARGV[3]: Time for message to be executed local topicZSet = KEYS[1] local topicHash = KEYS[2] local key = ARGV[1] local body = ARGV[2] local readyTime = tonumber(ARGV[3]) -- Add toreadyTimearrivezset local count = ("zadd", topicZSet, readyTime, key) -- The message already exists if count == 0 then return 0 end -- Add tobodyarrivehash ("hsetnx", topicHash, key, body) return 1 `
func (q *SimpleRedisDelayQueue) Push(ctx , msg *Msg) error { // If ReadyTime is set, use RedisTime var readyTime int64 if !() { readyTime = () } else { // Otherwise use Delay readyTime = ().Add().Unix() } success, err := (ctx, , []string{(), ()}, , , readyTime).Bool() if err != nil { return err } if !success { return ErrDuplicateMessage } return nil }
Consume
The second parameterbatchSize
Indicates that it is used to batch obtain messages that are ready for execution, reducing network requests.
fn
is a function that processes messages, which has a return valueerror
,in the case ofnil
It means that the message consumption is successful, and then the delete script is called to delete the message that has been successfully consumed (the contents in ZSet and Hash need to be deleted atomically).
const delayQueueDelRedisScript = ` -- KEYS[1]: topicZSet -- KEYS[2]: topicHash -- ARGV[1]: MessageKey local topicZSet = KEYS[1] local topicHash = KEYS[2] local key = ARGV[1] -- deletezsetandhash关于这条Message内容 ("zrem", topicZSet, key) ("hdel", topicHash, key) return 1 `
func (q *SimpleRedisDelayQueue) Consume(topic string, batchSize int, fn func(msg *Msg) error) { for { // Bulk fetch messages that are ready for execution now := ().Unix() zs, err := ((), (topic), &{ Min: "-inf", Max: (int(now)), Count: int64(batchSize), }).Result() // If a message is not retrieved or the message cannot be retrieved, sleep for one second if err != nil || len(zs) == 0 { () continue } // traverse each message for _, z := range zs { key := .(string) // Get the body of the message body, err := ((), (topic), key).Bytes() if err != nil { continue } // Process messages err = fn(&Msg{ Topic: topic, Key: key, Body: body, ReadyTime: (int64(), 0), }) if err != nil { continue } // If the message is processed successfully, delete the message ((), , []string{(topic), (topic)}, key) } } }
Existing problems
If multiple threads call at the same timeConsume
Functions, then multiple threads will pull the same executable message, causing the message to be consumed repeatedly.
Multi-consumer implementation
- Each topic can be partitioned by up to several consumers, and the topic will be partitioned.
Define the message
- We have added a Partition field to represent the partition number of the message
// Msg messagetype Msg struct { Topic string // Theme of the message Key string // The message's key Body []byte // The Body of the message Partition int // Partition number Delay // Delay time (seconds) ReadyTime // Time for the message to be executed}
Push
The code is the same as Push of SimpleRedisDelayQueue, except that we will use the Partition field in Msg to partition the topic.
func (q *PartitionRedisDelayQueue) topicZSet(topic string, partition int) string { return ("%s:%d:z", topic, partition) } func (q *PartitionRedisDelayQueue) topicHash(topic string, partition int) string { return ("%s:%d:h", topic, partition) }
Consume
The code is the same as the SimpleRedisDelayQueue Consume, we just added one more to the Consumepartition
Parameters are used to specify the consumed partition.
func (q *PartitionRedisDelayQueue) Consume(topic string, batchSize, partition int, fn func(msg *Msg) error) { // ... }
Existing problems
A big problem is that we need to manually specify partitions instead of automatically allocating partitions. This problem is easier to solve for Push operations. You can use hashing algorithms to partition Keys, for examplemurmur3. But it is more complicated for Consumer because we have to record which partition has been consumed by the consumer. If you really need more complex scenarios, it is recommended to useRocketMQ
、Kafka
Wait for the message queue to be implemented.
Summarize
- Using Redis's ZSet can easily implement a high-performance message queue
- However, the message queue implemented by Redis's ZSet is not suitable for scenarios where large amounts of messages are accumulated. At the same time, it will be more complicated if you need to implement automatic partition consumption function.
- Suitable for scenarios where the volume of messages is not very large and not very complicated
- If you need a large number of messages to accumulate and stable multi-consumer functions, you can use the built-in delayed messages
RocketMQ
This is the article about Go+Redis implementing delay queue practical operation. For more related Go delay queue content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!